From 05a2fe0ed3b6175a4a1dd03a98f544293b6d504e Mon Sep 17 00:00:00 2001 From: donalevans Date: Thu, 4 Sep 2025 15:49:53 -0700 Subject: [PATCH 1/4] Return 429 status when RequestExecutorService queue full Instead of returning a 500 (internal server error) response when the RequestExecutorService queue is full and a new request is submitted, return a 429 (too many requests) response. - Wrap the existing EsRejectedExecutionException in an ElasticsearchStatusException before throwing - Update existing tests for new behaviour --- .../http/sender/RequestExecutorService.java | 12 ++++- .../sender/RequestExecutorServiceTests.java | 46 +++++++++---------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index 55b59e3fd1d9f..d7f6bd9480866 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -16,6 +17,7 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue; @@ -519,7 +521,15 @@ public void enqueue(RequestTask task) { false ); - task.onRejection(rejected); + ElasticsearchStatusException statusException = new ElasticsearchStatusException( + format( + "Failed to execute task for inference id [%s] because the request service queue is full", + task.getRequestManager().inferenceEntityId() + ), + RestStatus.TOO_MANY_REQUESTS, + rejected + ); + task.onRejection(statusException); } else if (isShutdown()) { notifyRequestsOfShutdown(); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 163c4b84f1780..523fd4e8e1b2d 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors; import static org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueueTests.mockQueueCreator; import static org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceSettingsTests.createRequestExecutorServiceSettings; @@ -189,22 +190,21 @@ public void testExecute_Throws_WhenQueueIsFull() { new PlainActionFuture<>() ); - var requestManager = RequestManagerTests.createMock("id"); + String inferenceEntityId = "id"; + var requestManager = RequestManagerTests.createMock(inferenceEntityId); var listener = new PlainActionFuture(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); - var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is( - Strings.format( - "Failed to execute task for inference id [id] because the request service [%s] queue is full", - requestManager.rateLimitGrouping().hashCode() - ) - ) + is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) ); - assertFalse(thrownException.isExecutorShutdown()); + assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); + Throwable cause = thrownException.getCause(); + assertThat(cause, is(instanceOf(EsRejectedExecutionException.class))); + assertThat(((EsRejectedExecutionException) cause).isExecutorShutdown(), is(false)); } public void testTaskThrowsError_CallsOnFailure() throws InterruptedException { @@ -396,19 +396,17 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); - var requestManager = RequestManagerTests.createMock(requestSender, "id"); + String inferenceEntityId = "id"; + var requestManager = RequestManagerTests.createMock(requestSender, inferenceEntityId); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); - var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is( - Strings.format( - "Failed to execute task for inference id [id] because the request service [%s] queue is full", - requestManager.rateLimitGrouping().hashCode() - ) - ) + is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) ); + assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); + assertThat(thrownException.getCause(), is(instanceOf(EsRejectedExecutionException.class))); settings.setQueueCapacity(2); @@ -503,23 +501,21 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); + String inferenceEntityId = "id"; service.execute( - RequestManagerTests.createMock(requestSender, "id"), + RequestManagerTests.createMock(requestSender, inferenceEntityId), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener ); - var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is( - Strings.format( - "Failed to execute task for inference id [id] because the request service [%s] queue is full", - requestManager.rateLimitGrouping().hashCode() - ) - ) + is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) ); + assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); + assertThat(thrownException.getCause(), is(instanceOf(EsRejectedExecutionException.class))); settings.setQueueCapacity(0); From 82104aae1147028093a3d8c62303a7f911572319 Mon Sep 17 00:00:00 2001 From: donalevans Date: Fri, 5 Sep 2025 09:56:47 -0700 Subject: [PATCH 2/4] Revert "Return 429 status when RequestExecutorService queue full" This reverts commit 05a2fe0ed3b6175a4a1dd03a98f544293b6d504e. --- .../http/sender/RequestExecutorService.java | 12 +---- .../sender/RequestExecutorServiceTests.java | 46 ++++++++++--------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java index d7f6bd9480866..55b59e3fd1d9f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -17,7 +16,6 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue; @@ -521,15 +519,7 @@ public void enqueue(RequestTask task) { false ); - ElasticsearchStatusException statusException = new ElasticsearchStatusException( - format( - "Failed to execute task for inference id [%s] because the request service queue is full", - task.getRequestManager().inferenceEntityId() - ), - RestStatus.TOO_MANY_REQUESTS, - rejected - ); - task.onRejection(statusException); + task.onRejection(rejected); } else if (isShutdown()) { notifyRequestsOfShutdown(); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java index 523fd4e8e1b2d..163c4b84f1780 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeoutException; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors; import static org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueueTests.mockQueueCreator; import static org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceSettingsTests.createRequestExecutorServiceSettings; @@ -190,21 +189,22 @@ public void testExecute_Throws_WhenQueueIsFull() { new PlainActionFuture<>() ); - String inferenceEntityId = "id"; - var requestManager = RequestManagerTests.createMock(inferenceEntityId); + var requestManager = RequestManagerTests.createMock("id"); var listener = new PlainActionFuture(); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); - var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) + is( + Strings.format( + "Failed to execute task for inference id [id] because the request service [%s] queue is full", + requestManager.rateLimitGrouping().hashCode() + ) + ) ); - assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); - Throwable cause = thrownException.getCause(); - assertThat(cause, is(instanceOf(EsRejectedExecutionException.class))); - assertThat(((EsRejectedExecutionException) cause).isExecutorShutdown(), is(false)); + assertFalse(thrownException.isExecutorShutdown()); } public void testTaskThrowsError_CallsOnFailure() throws InterruptedException { @@ -396,17 +396,19 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException, assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); - String inferenceEntityId = "id"; - var requestManager = RequestManagerTests.createMock(requestSender, inferenceEntityId); + var requestManager = RequestManagerTests.createMock(requestSender, "id"); service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener); - var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) + is( + Strings.format( + "Failed to execute task for inference id [id] because the request service [%s] queue is full", + requestManager.rateLimitGrouping().hashCode() + ) + ) ); - assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); - assertThat(thrownException.getCause(), is(instanceOf(EsRejectedExecutionException.class))); settings.setQueueCapacity(2); @@ -501,21 +503,23 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO assertThat(service.queueSize(), is(1)); PlainActionFuture listener = new PlainActionFuture<>(); - String inferenceEntityId = "id"; service.execute( - RequestManagerTests.createMock(requestSender, inferenceEntityId), + RequestManagerTests.createMock(requestSender, "id"), new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener ); - var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); + var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT)); assertThat( thrownException.getMessage(), - is(Strings.format("Failed to execute task for inference id [%s] because the request service queue is full", inferenceEntityId)) + is( + Strings.format( + "Failed to execute task for inference id [id] because the request service [%s] queue is full", + requestManager.rateLimitGrouping().hashCode() + ) + ) ); - assertThat(thrownException.status(), is(TOO_MANY_REQUESTS)); - assertThat(thrownException.getCause(), is(instanceOf(EsRejectedExecutionException.class))); settings.setQueueCapacity(0); From 7d5059c403cbc145d14d1207afc5c3a26741b3f2 Mon Sep 17 00:00:00 2001 From: donalevans Date: Fri, 5 Sep 2025 11:14:30 -0700 Subject: [PATCH 3/4] Determine appropriate status for wrapped exception Rather than always throwing an ElasticsearchStatusException with 500 status in ActionUtils.wrapFailuresInElasticsearchException(), determine the appropriate status from the unwrapped exception - Remove createInternalServerError() method from ActionUtils - Refactor AlibabaCloudSearch*Action classes to be consistent with SenderExecutableAction - Update tests to account for new behaviour --- .../external/action/ActionUtils.java | 13 ++-- .../AlibabaCloudSearchCompletionAction.java | 14 ++--- .../AlibabaCloudSearchEmbeddingsAction.java | 14 ++--- .../AlibabaCloudSearchRerankAction.java | 14 ++--- .../AlibabaCloudSearchSparseAction.java | 14 ++--- .../action/SenderExecutableActionTests.java | 62 +++++++++++++++---- ...ibabaCloudSearchCompletionActionTests.java | 5 +- 7 files changed, 77 insertions(+), 59 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java index 61cada6e75ef1..03b94b3d49676 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.inference.InferenceServiceResults; -import org.elasticsearch.rest.RestStatus; public class ActionUtils { @@ -28,19 +27,17 @@ public static ActionListener wrapFailuresInElasticsearc l.onFailure(esException); } else { l.onFailure( - createInternalServerError( - unwrappedException, - Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()) + // Determine the appropriate RestStatus from the unwrapped exception, then wrap in an ElasticsearchStatusException + new ElasticsearchStatusException( + Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()), + ExceptionsHelper.status(unwrappedException), + unwrappedException ) ); } }); } - public static ElasticsearchStatusException createInternalServerError(Throwable e, String message) { - return new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, e); - } - public static String constructFailedToSendRequestMessage(String message) { return Strings.format("Failed to send %s request", message); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java index ed15871a2eb69..a45c42f7d933f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; @@ -27,7 +26,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchCompletionAction implements ExecutableAction { @@ -61,16 +59,14 @@ public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionLi return; } + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java index a7839f85ca417..473d9b7876845 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -22,7 +21,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction { @@ -42,16 +40,14 @@ public AlibabaCloudSearchEmbeddingsAction(Sender sender, AlibabaCloudSearchEmbed @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java index 9517eb736c87f..fd91bcee41c8f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -24,7 +23,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchRerankAction implements ExecutableAction { @@ -46,16 +44,14 @@ public AlibabaCloudSearchRerankAction(Sender sender, AlibabaCloudSearchRerankMod @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java index 5d74e91d21d49..dc4d628325b4d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -24,7 +23,6 @@ import java.util.Objects; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError; import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException; public class AlibabaCloudSearchSparseAction implements ExecutableAction { @@ -46,16 +44,14 @@ public AlibabaCloudSearchSparseAction(Sender sender, AlibabaCloudSearchSparseMod @Override public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener listener) { + ActionListener wrappedListener = wrapFailuresInElasticsearchException( + failedToSendRequestErrorMessage, + listener + ); try { - ActionListener wrappedListener = wrapFailuresInElasticsearchException( - failedToSendRequestErrorMessage, - listener - ); sender.send(requestCreator, inferenceInputs, timeout, wrappedListener); - } catch (ElasticsearchException e) { - listener.onFailure(e); } catch (Exception e) { - listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage)); + wrappedListener.onFailure(e); } } } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java index 4d88eabb105db..30ef1ca71cedf 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java @@ -10,8 +10,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.inference.external.http.sender.InferenceInputs; import org.elasticsearch.xpack.inference.external.http.sender.RequestManager; @@ -89,31 +91,67 @@ public void testSenderReturnedElasticsearchExceptionIsUnwrapped() { @SuppressWarnings("unchecked") public void testSendThrowingExceptionIsWrapped() { var expectedException = new IllegalStateException("test"); - var actualException = new AtomicReference(); + var actualExceptionReference = new AtomicReference(); doThrow(expectedException).when(sender) .send(eq(requestManager), any(InferenceInputs.class), any(TimeValue.class), any(ActionListener.class)); - execute(actualException); + execute(actualExceptionReference); - assertThat(actualException.get(), notNullValue()); - assertThat(actualException.get().getMessage(), is(failureExceptionMessage)); - assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class)); - assertThat(actualException.get().getCause(), sameInstance(expectedException)); + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR)); } public void testSenderReturnedExceptionIsWrapped() { var expectedException = new IllegalStateException("test"); - var actualException = new AtomicReference(); + var actualExceptionReference = new AtomicReference(); mockSender(listener -> listener.onFailure(expectedException)); - execute(actualException); + execute(actualExceptionReference); - assertThat(actualException.get(), notNullValue()); - assertThat(actualException.get().getMessage(), is(failureExceptionMessage)); - assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class)); - assertThat(actualException.get().getCause(), sameInstance(expectedException)); + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsIllegalArgumentException() { + var expectedException = new IllegalArgumentException("test"); + var actualExceptionReference = new AtomicReference(); + + mockSender(listener -> listener.onFailure(expectedException)); + + execute(actualExceptionReference); + + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.BAD_REQUEST)); + } + + public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsEsRejectedExecutionException() { + var expectedException = new EsRejectedExecutionException("test"); + var actualExceptionReference = new AtomicReference(); + + mockSender(listener -> listener.onFailure(expectedException)); + + execute(actualExceptionReference); + + Exception actualException = actualExceptionReference.get(); + assertThat(actualException, notNullValue()); + assertThat(actualException.getMessage(), is(failureExceptionMessage)); + assertThat(actualException, instanceOf(ElasticsearchStatusException.class)); + assertThat(actualException.getCause(), sameInstance(expectedException)); + assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.TOO_MANY_REQUESTS)); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java index 6622590cb1ba8..b129c225cf091 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java @@ -42,7 +42,6 @@ import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResultsTests.buildExpectationCompletion; import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors; import static org.elasticsearch.xpack.inference.Utils.mockClusterServiceEmpty; -import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage; import static org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettingsTests.getSecretSettingsMap; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -110,8 +109,8 @@ public void testExecute_ListenerThrowsInternalServerError_WhenSenderThrowsExcept PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ChatCompletionInput(List.of(randomAlphaOfLength(10))), InferenceAction.Request.DEFAULT_TIMEOUT, listener); - var thrownException = expectThrows(ElasticsearchException.class, () -> listener.actionGet(TIMEOUT)); - assertThat(thrownException.getMessage(), is(constructFailedToSendRequestMessage("AlibabaCloud Search completion"))); + var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT)); + assertThat(thrownException.getMessage(), is("Failed to send AlibabaCloud Search completion request. Cause: error")); } public void testExecute_ThrowsIllegalArgumentException_WhenInputIsNotChatCompletionInput() { From ade8984900a0806be967160ac964037798f04395 Mon Sep 17 00:00:00 2001 From: Donal Evans Date: Fri, 5 Sep 2025 12:17:07 -0700 Subject: [PATCH 4/4] Update docs/changelog/134178.yaml --- docs/changelog/134178.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/134178.yaml diff --git a/docs/changelog/134178.yaml b/docs/changelog/134178.yaml new file mode 100644 index 0000000000000..aaf002310dcd3 --- /dev/null +++ b/docs/changelog/134178.yaml @@ -0,0 +1,5 @@ +pr: 134178 +summary: Return 429 status when `RequestExecutorService` queue full +area: Machine Learning +type: bug +issues: []