Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134178.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134178
summary: Return 429 status when `RequestExecutorService` queue full
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.inference.InferenceServiceResults;
import org.elasticsearch.rest.RestStatus;

public class ActionUtils {

Expand All @@ -28,19 +27,17 @@ public static ActionListener<InferenceServiceResults> wrapFailuresInElasticsearc
l.onFailure(esException);
} else {
l.onFailure(
createInternalServerError(
unwrappedException,
Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage())
// Determine the appropriate RestStatus from the unwrapped exception, then wrap in an ElasticsearchStatusException
new ElasticsearchStatusException(
Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()),
ExceptionsHelper.status(unwrappedException),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 that's a neat solution

unwrappedException
)
);
}
});
}

public static ElasticsearchStatusException createInternalServerError(Throwable e, String message) {
return new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, e);
}

public static String constructFailedToSendRequestMessage(String message) {
return Strings.format("Failed to send %s request", message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
Expand All @@ -27,7 +26,6 @@
import java.util.Objects;

import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;

public class AlibabaCloudSearchCompletionAction implements ExecutableAction {
Expand Down Expand Up @@ -61,16 +59,14 @@ public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionLi
return;
}

ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These Alibaba classes are the outlier everything else is done by SenderExecutableAction. Please can you convert AlibabaCloudSearchActionCreator to produce SenderExecutableAction and remove these classes. You will probably have to keep AlibabaCloudSearchCompletionAction as that has extra logic in it to check the inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this in another PR, since it's pretty far out of scope for this one.

failedToSendRequestErrorMessage,
listener
);
try {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
} catch (ElasticsearchException e) {
listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
Expand All @@ -22,7 +21,6 @@
import java.util.Objects;

import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;

public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction {
Expand All @@ -42,16 +40,14 @@ public AlibabaCloudSearchEmbeddingsAction(Sender sender, AlibabaCloudSearchEmbed

@Override
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
try {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
} catch (ElasticsearchException e) {
listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
Expand All @@ -24,7 +23,6 @@
import java.util.Objects;

import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;

public class AlibabaCloudSearchRerankAction implements ExecutableAction {
Expand All @@ -46,16 +44,14 @@ public AlibabaCloudSearchRerankAction(Sender sender, AlibabaCloudSearchRerankMod

@Override
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
try {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
} catch (ElasticsearchException e) {
listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
Expand All @@ -24,7 +23,6 @@
import java.util.Objects;

import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;

public class AlibabaCloudSearchSparseAction implements ExecutableAction {
Expand All @@ -46,16 +44,14 @@ public AlibabaCloudSearchSparseAction(Sender sender, AlibabaCloudSearchSparseMod

@Override
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
try {
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
failedToSendRequestErrorMessage,
listener
);
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
} catch (ElasticsearchException e) {
listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.inference.external.http.sender.InferenceInputs;
import org.elasticsearch.xpack.inference.external.http.sender.RequestManager;
Expand Down Expand Up @@ -89,31 +91,67 @@ public void testSenderReturnedElasticsearchExceptionIsUnwrapped() {
@SuppressWarnings("unchecked")
public void testSendThrowingExceptionIsWrapped() {
var expectedException = new IllegalStateException("test");
var actualException = new AtomicReference<Exception>();
var actualExceptionReference = new AtomicReference<Exception>();

doThrow(expectedException).when(sender)
.send(eq(requestManager), any(InferenceInputs.class), any(TimeValue.class), any(ActionListener.class));

execute(actualException);
execute(actualExceptionReference);

assertThat(actualException.get(), notNullValue());
assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.get().getCause(), sameInstance(expectedException));
Exception actualException = actualExceptionReference.get();
assertThat(actualException, notNullValue());
assertThat(actualException.getMessage(), is(failureExceptionMessage));
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.getCause(), sameInstance(expectedException));
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
}

public void testSenderReturnedExceptionIsWrapped() {
var expectedException = new IllegalStateException("test");
var actualException = new AtomicReference<Exception>();
var actualExceptionReference = new AtomicReference<Exception>();

mockSender(listener -> listener.onFailure(expectedException));

execute(actualException);
execute(actualExceptionReference);

assertThat(actualException.get(), notNullValue());
assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.get().getCause(), sameInstance(expectedException));
Exception actualException = actualExceptionReference.get();
assertThat(actualException, notNullValue());
assertThat(actualException.getMessage(), is(failureExceptionMessage));
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.getCause(), sameInstance(expectedException));
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
}

public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsIllegalArgumentException() {
var expectedException = new IllegalArgumentException("test");
var actualExceptionReference = new AtomicReference<Exception>();

mockSender(listener -> listener.onFailure(expectedException));

execute(actualExceptionReference);

Exception actualException = actualExceptionReference.get();
assertThat(actualException, notNullValue());
assertThat(actualException.getMessage(), is(failureExceptionMessage));
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.getCause(), sameInstance(expectedException));
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.BAD_REQUEST));
}

public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsEsRejectedExecutionException() {
var expectedException = new EsRejectedExecutionException("test");
var actualExceptionReference = new AtomicReference<Exception>();

mockSender(listener -> listener.onFailure(expectedException));

execute(actualExceptionReference);

Exception actualException = actualExceptionReference.get();
assertThat(actualException, notNullValue());
assertThat(actualException.getMessage(), is(failureExceptionMessage));
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
assertThat(actualException.getCause(), sameInstance(expectedException));
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.TOO_MANY_REQUESTS));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResultsTests.buildExpectationCompletion;
import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors;
import static org.elasticsearch.xpack.inference.Utils.mockClusterServiceEmpty;
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
import static org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettingsTests.getSecretSettingsMap;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -110,8 +109,8 @@ public void testExecute_ListenerThrowsInternalServerError_WhenSenderThrowsExcept
PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
action.execute(new ChatCompletionInput(List.of(randomAlphaOfLength(10))), InferenceAction.Request.DEFAULT_TIMEOUT, listener);

var thrownException = expectThrows(ElasticsearchException.class, () -> listener.actionGet(TIMEOUT));
assertThat(thrownException.getMessage(), is(constructFailedToSendRequestMessage("AlibabaCloud Search completion")));
var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT));
assertThat(thrownException.getMessage(), is("Failed to send AlibabaCloud Search completion request. Cause: error"));
}

public void testExecute_ThrowsIllegalArgumentException_WhenInputIsNotChatCompletionInput() {
Expand Down