-
Notifications
You must be signed in to change notification settings - Fork 25.5k
[Inference API] Remove worst-case additional 50ms latency for non-rate limited requests #136167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Pinging @elastic/ml-core (Team:ML) |
Hi @timgrein, I've created a changelog YAML for you. |
@timgrein Great work! Would you be able to similarly generate a graphic against a local EIS directly such that we understand if any overhead exists? I understand the logic behind the PR, but I'm curious what overhead exists still. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Left a few suggestions.
public void shutdown() { | ||
if (shutdown.compareAndSet(false, true)) { | ||
if (requestQueueTask != null) { | ||
boolean cancelled = FutureUtils.cancel(requestQueueTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, I think it's up to our implementation to check if it is canceled. So I think we'll get stuck in the queue.take()
🤔
It doesn't seem like FutureUtils.cancel()
will do an interrupt.
This is how we've handled that in the past: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L253
The shutdown()
method puts a noop task on the queue to ensure that it wakes up.
Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted with Add NoopTask to wake up queue on shutdown
Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?
AFAIU we always check that when calling submitShutdownRequest, right?
...main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java
Show resolved
Hide resolved
logger.debug("Inference request queue interrupted, exiting"); | ||
} catch (Exception e) { | ||
logger.warn("Error processing task in inference request queue", e); | ||
cleanup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we move this to a finally
block to ensure it gets called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved with Cleanup in finally block
startHandlingRateLimitedTasks(); | ||
} catch (Exception e) { | ||
logger.warn("Failed to start request executor", e); | ||
cleanup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a small potential for an edge case here (if we go the noop task route to do a shutdown for the queue.take()
). If an exception occurs in startHandlingRateLimitedTasks()
, it could cause the requestQueue to be drained which could mean that it'd never get the noop task.
I'd have to think of a good way to solve that. Maybe we split up the cleanup methods so that this one doesn't drain the requestQueue
, instead the processRequestQueue()
would call a different cleanup()
that'd handle doing that 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try { | ||
while (isShutdown() == false) { | ||
// Blocks the request queue thread until a new request comes in | ||
var task = (RequestTask) requestQueue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the cast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to replace uses of RequestTask
with RejectableTask
in this class, since there's no reason that the interface can't be used throughout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var task = (RequestTask) requestQueue.take(); | ||
|
||
if (isShutdown()) { | ||
logger.debug("Shutdown requested while handling request tasks, cleaning up"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're shutting down we need to reject the task we pulled off the requestQueue
Here's an example of doing that: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L192
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted with Reject request on shutdown
|
||
if (isShutdown()) { | ||
logger.debug("Shutdown requested while handling request tasks, cleaning up"); | ||
cleanup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we move this to a finally
block we probably don't need it here and we could probably remove the return
.
}); | ||
|
||
endpoint.enqueue(task); | ||
if (taskAccepted == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task was accepted we'll want to check is shutdown one last time to ensure we notify this task that we're shutting down.
endpoint.enqueue(task); | ||
} | ||
|
||
private boolean rateLimitingEnabled(RequestManager requestManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency how about we make this static and accept a RateLimitSettings
object. Then we can use it in executeEnqueuedTaskInternal
which has a similar check. Technically executeEnqueuedTaskInternal
should never receive a non-rate limited task, but it'd probably be good to check just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Adjusted with Reuse rateLimitSettingsEnabled check
Do you mean on my local machine with "local EIS"? If so, ES and EIS ran on my local machine for the results you see above |
// Reject non-rate-limited requests | ||
List<RejectableTask> requests = new ArrayList<>(); | ||
requestQueue.drainTo(requests); | ||
|
||
for (var request : requests) { | ||
rejectRequest(request); | ||
} | ||
} | ||
|
||
private void rejectRequest(RejectableTask task) { | ||
try { | ||
task.onRejection( | ||
new EsRejectedExecutionException( | ||
format( | ||
"Failed to send request for inference id [%s] has shutdown prior to executing request", | ||
task.getRequestManager().inferenceEntityId() | ||
), | ||
true | ||
) | ||
); | ||
} catch (Exception e) { | ||
logger.warn( | ||
format( | ||
"Failed to notify request for inference id [%s] of rejection after executor service shutdown", | ||
task.getRequestManager().inferenceEntityId() | ||
) | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is duplicated in RateLimitingEndpointHandler
, with the only difference being the format of the error message. Would it be possible to extract it to a static method that both places could call? Rather than needing to use the id
field directly for the service grouping, it can be derived from the task: Integer.toString(task.getRequestManager().rateLimitGrouping().hashCode())
There are also quite a few other places where we reject tasks due to shutdown that could be extracted to a method call to reduce code duplication and ensure consistency for the error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the id (inferenceEntityId
being the name of the inference endpoint AFAIU) and service grouping are two different things, so IMO we should keep id
in the error messages, so it's clear, which endpoint failed to execute a request. I think the hashCode
of the rateLimitGrouping
could become quite cryptic for a clear log/error message.
But nothing speaks against extracting the common part to a static method 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, the id
field in RateLimitingEndpointHandler
is the service grouping hash code. The field is not well named in that respect.
thrownException.getMessage(), | ||
is( | ||
Strings.format( | ||
"Failed to send request, request service [3355] for inference id [id] has shutdown prior to executing request", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be service [%s] for inference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it actually is: error message formatting. AFAIUhashCode
is simply executed on the inferenceEntityId
leading to 3355
in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that you're using String.format()
to construct the error message and passing requestManager.rateLimitGrouping().hashCode()
as the second argument to it, but the string in the first argument doesn't have any placeholders for that value to be used. Right now the test is passing because the rate limit grouping is a constant from run to run, but if it ever changes, then the hard-coded hash code of 3355 will no longer be correct and the test will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it now, adjusted with Use string placeholder in assertion
service.submitTaskToRateLimitedExecutionPath( | ||
new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary? The test still passes if execute()
is called instead of submitTaskToRateLimitedExecutionPath()
.
Also, while I know you didn't add it in this PR, this test's name is at odds with what it's actually asserting. The name implies that we don't expect an exception when polling the queue to cause the service to terminate, but the test explicitly terminates the service as part of throwing the exception, then asserts that it's terminated. Is this test actually testing anything other than that calling shutdown()
causes the service to shut down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted the test to use execute
again.
Is this test actually testing anything other than that calling shutdown() causes the service to shut down?
Good point, I've adjusted the test to assert that the service still runs after task.execute(...)
threw an exception. I've used task.execute(...)
instead of queue.poll()
, because poll()
is also used internally in AdjustableCapacityBlockingQueue
when calling requestQueue.take()
in processRequestQueue
, which does in fact terminate the service and made the test green for the wrong reason. Commit: Adjust test to check that a throwing task does not terminate the service. (I'll add a similar test for the non-rate-limited execution path)
Speaking of that: executeTaskImmediately
inside processRequestQueue
handles any Exceptions thrown by the processed request task without terminating the service. AFAIU take()
shouldn't throw except when it's calling thread is interrupted, which we need to handle explicitly anyway. I've adjusted the error message in the general Exception handler with Adjust error message in general exception handler to reflect that an exception here is not coming from a task or a task rejection, but is potentially a more fundamental issue leading to service termination. Just wanted to double-check, if my reasoning is correct here and we want this behavior?
endpointHandler.init(); | ||
return endpointHandler; | ||
}); | ||
boolean taskAccepted = requestQueue.offer(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, prior to this change, when execute()
was called, we would retrieve (or create) a RateLimitingEndpointHandler
appropriate to the task's rate limit settings, then add the task to the queue associated with that handler. This meant that there was one queue per RateLimitingEndpointHandler
, each with a capacity defined by the RequestExecutorServiceSettings
, and that we wouldn't reject a task unless the queue for the handler associated with it was full.
With the new change, all tasks are first submitted to the single requestQueue
queue, which has the same capacity as each of the queues managed by the handlers, meaning that we can begin rejecting tasks even though none of the queues associated with the handlers are full, effectively reducing the total number of requests we can process in a given time.
Would it be better to instead only add tasks with no rate limit settings to the request queue, and call submitTaskToRateLimitedExecutionPath()
in the execute()
method for tasks with rate limit settings? That way, we're not adding rate limited tasks to one queue just to later remove them and add them to another queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very valid point @DonalEvans! I'll adjust the implementation, thanks for flagging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
rejectRequest( | ||
task, | ||
format("Failed to send request for inference id [%s] has shutdown prior to executing request", inferenceEntityId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message might be better as "Failed to send request for inference id [%s] because the request executor service has been shutdown"
to make it consistent with the error we report in execute()
if we try to queue a task when we're shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task, | ||
format( | ||
"Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request", | ||
id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strictly related to this PR, but since you're making changes in this class, could you rename the id
field on RateLimitingEndpointHandler
to be something more descriptive, like rateLimitGroupingId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requestManager.rateLimitGrouping().hashCode() | ||
) | ||
) | ||
is("Failed to execute task for inference id [id] because the request service [3355] queue is full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be using Strings.format()
with [3355]
replaced with [%s]
and the other argument being requestManager.rateLimitGrouping().hashCode()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SuppressWarnings("unchecked") | ||
var stubbing = when(mockExecutorService.submit(any(Runnable.class))).thenReturn(mock(Future.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This suppression and unused variable can be avoided using the thenAnswer()
method, which I think is a little cleaner:
when(mockExecutorService.submit(any(Runnable.class))).thenAnswer(i -> mock(Future.class));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…s. not rate-limited)
} | ||
|
||
if (rateLimitingEnabled(requestManager.rateLimitSettings())) { | ||
if (isEmbeddingsIngestInput(inferenceInputs)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rateLimitingEnabled check should be retained
if (isEmbeddingsIngestInput(inferenceInputs) || rateLimitingEnabled(requestManager.rateLimitSettings())) {
} | ||
} | ||
|
||
private static final RejectableTask NoopTask = new RejectableTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: When I first saw the comparison I thought this was a class name. How about we make this all caps or noopTask
.
With this PR I've added a new request queue to the inference API, which executes requests, which are not ingest embeddings requests (e.g. embeddings generation on search path, rerank etc.) immediately. Otherwise the requests are simply submitted to the rate limited execution path as before. This requests queue is executed in a separate thread, which blocks until a new request comes available avoiding explicit polling. This removes an additional worst-case latency of
xpack.inference.http.request_executor.task_poll_frequency
(default:50ms
), which we observed when generating sparse text embeddings using EIS.I've verified the improved latency by running ES (with & without the changes introduced by this PR) and EIS locally and executing requests using
vegeta
.Without optimization:

With optimization:
