diff --git a/docs/changelog/134178.yaml b/docs/changelog/134178.yaml new file mode 100644 index 0000000000000..aaf002310dcd3 --- /dev/null +++ b/docs/changelog/134178.yaml @@ -0,0 +1,5 @@ +pr: 134178 +summary: Return 429 status when `RequestExecutorService` queue full +area: Machine Learning +type: bug +issues: [] diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java index 61cada6e75ef1..03b94b3d49676 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.inference.InferenceServiceResults; -import org.elasticsearch.rest.RestStatus; public class ActionUtils { @@ -28,19 +27,17 @@ public static ActionListener wrapFailuresInElasticsearc l.onFailure(esException); } else { l.onFailure( - createInternalServerError( - unwrappedException, - Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()) + // Determine the appropriate RestStatus from the unwrapped exception, then wrap in an ElasticsearchStatusException + new ElasticsearchStatusException( + Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()), + ExceptionsHelper.status(unwrappedException), + unwrappedException ) ); } }); } - public static ElasticsearchStatusException createInternalServerError(Throwable e, String message) { - return new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, e); - } - public static String constructFailedToSendRequestMessage(String message) { return Strings.format("Failed to send %s request", message); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java index ed15871a2eb69..a45c42f7d933f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; @@ -27,7 +26,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchCompletionAction implements ExecutableAction { @@ -61,16 +59,14 @@ public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionLi return; } + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java index a7839f85ca417..473d9b7876845 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -22,7 +21,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction { @@ -42,16 +40,14 @@ public AlibabaCloudSearchEmbeddingsAction(Sender sender, AlibabaCloudSearchEmbed @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java index 9517eb736c87f..fd91bcee41c8f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -24,7 +23,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchRerankAction implements ExecutableAction { @@ -46,16 +44,14 @@ public AlibabaCloudSearchRerankAction(Sender sender, AlibabaCloudSearchRerankMod @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java index 5d74e91d21d49..dc4d628325b4d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -24,7 +23,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchSparseAction implements ExecutableAction { @@ -46,16 +44,14 @@ public AlibabaCloudSearchSparseAction(Sender sender, AlibabaCloudSearchSparseMod @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java index 4d88eabb105db..30ef1ca71cedf 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java @@ -10,8 +10,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.inference.external.http.sender.InferenceInputs; import org.elasticsearch.xpack.inference.external.http.sender.RequestManager; @@ -89,31 +91,67 @@ public void testSenderReturnedElasticsearchExceptionIsUnwrapped() { @SuppressWarnings("unchecked") public void testSendThrowingExceptionIsWrapped() { var expectedException = new IllegalStateException("test"); - var actualException = new AtomicReference(); + var actualExceptionReference = new AtomicReference(); doThrow(expectedException).when(sender) .send(eq(requestManager), any(InferenceInputs.class), any(TimeValue.class), any(ActionListener.class)); - execute(actualException); + execute(actualExceptionReference); - assertThat(actualException.get(), notNullValue()); - assertThat(actualException.get().getMessage(), is(failureExceptionMessage)); - assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class)); - assertThat(actualException.get().getCause(), sameInstance(expectedException)); + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR)); } public void testSenderReturnedExceptionIsWrapped() { var expectedException = new IllegalStateException("test"); - var actualException = new AtomicReference(); + var actualExceptionReference = new AtomicReference(); mockSender(listener -> listener.onFailure(expectedException)); - execute(actualException); + execute(actualExceptionReference); - assertThat(actualException.get(), notNullValue()); - assertThat(actualException.get().getMessage(), is(failureExceptionMessage)); - assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class)); - assertThat(actualException.get().getCause(), sameInstance(expectedException)); + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsIllegalArgumentException() { + var expectedException = new IllegalArgumentException("test"); + var actualExceptionReference = new AtomicReference(); + + mockSender(listener -> listener.onFailure(expectedException)); + + execute(actualExceptionReference); + + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.BAD_REQUEST)); + } + + public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsEsRejectedExecutionException() { + var expectedException = new EsRejectedExecutionException("test"); + var actualExceptionReference = new AtomicReference(); + + mockSender(listener -> listener.onFailure(expectedException)); + + execute(actualExceptionReference); + + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.TOO_MANY_REQUESTS)); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java index 6622590cb1ba8..b129c225cf091 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java @@ -42,7 +42,6 @@ import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResultsTests.buildExpectationCompletion; import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors; import static org.elasticsearch.xpack.inference.Utils.mockClusterServiceEmpty; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; import static org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettingsTests.getSecretSettingsMap; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -110,8 +109,8 @@ public void testExecute_ListenerThrowsInternalServerError_WhenSenderThrowsExcept PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ChatCompletionInput(List.of(randomAlphaOfLength(10))), InferenceAction.Request.DEFAULT_TIMEOUT, listener); - var thrownException = expectThrows(ElasticsearchException.class, () -> listener.actionGet(TIMEOUT)); - assertThat(thrownException.getMessage(), is(constructFailedToSendRequestMessage("AlibabaCloud Search completion"))); + var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); + assertThat(thrownException.getMessage(), is("Failed to send AlibabaCloud Search completion request. Cause: error")); } public void testExecute_ThrowsIllegalArgumentException_WhenInputIsNotChatCompletionInput() {