Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f1e7419
Remove worst-case additional 50ms latency for non-rate limited requests
timgrein Oct 8, 2025
a326e31
Update docs/changelog/136167.yaml
timgrein Oct 8, 2025
191029f
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
edaa0f0
Do not use forbidden API
timgrein Oct 8, 2025
57c5605
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
f98b82e
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
f17ec00
Move startRequestQueueTask before start signal
timgrein Oct 8, 2025
90ee1a1
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
a9e7610
Cleanup in finally block
timgrein Oct 8, 2025
ec513be
Reject request on shutdown
timgrein Oct 8, 2025
174526c
Reuse rateLimitSettingsEnabled check
timgrein Oct 8, 2025
f506cb3
Add NoopTask to wake up queue on shutdown
timgrein Oct 9, 2025
ae349fd
Only add non-rate-limited tasks to fast-path request queue
timgrein Oct 9, 2025
540f49d
Extract rejection logic to common static method
timgrein Oct 9, 2025
0dca88a
Remove unnecessary cast
timgrein Oct 10, 2025
0590561
Use string placeholder in assertion
timgrein Oct 10, 2025
2e65475
Adjust test to check that a throwing task does not terminate the service
timgrein Oct 10, 2025
4fb2372
Adjust error message in general exception handler
timgrein Oct 10, 2025
2930151
Adjust warn to error
timgrein Oct 10, 2025
b2fd85f
Adjust error message when request gets rejected
timgrein Oct 13, 2025
91f387a
Rename id in RateLimitingEndpointHandler to rateLimitGroupingId
timgrein Oct 13, 2025
90e672b
Use Strings.format(...) in assertion
timgrein Oct 13, 2025
1cf24dc
Use thenAnswer instead of suppression
timgrein Oct 13, 2025
a92a7c0
Only reject requests of the respective execution path (rate-limited v…
timgrein Oct 13, 2025
8e52c22
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 14, 2025
a868152
Submit only ingest embeddings requests to rate-limited execution path
timgrein Oct 14, 2025
dd53fcb
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 14, 2025
c575eba
Add rate limiting check
timgrein Oct 14, 2025
69db0e1
Make NoopTask all caps
timgrein Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136167.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136167
summary: "[Inference API] Remove worst-case additional 50ms latency for non-rate limited\
\ requests"
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -53,6 +55,29 @@
* {@link org.apache.http.client.methods.HttpUriRequest} to set a timeout for how long this executor will wait
* attempting to execute a task (aka waiting for the connection manager to lease a connection). See
* {@link org.apache.http.client.config.RequestConfig.Builder#setConnectionRequestTimeout} for more info.
*
* The request flow looks as follows:
*
* -------------> Execute request immediately.
* |
* |
* request NOT supporting
* rate limiting
* |
* |
* Request ---> [-Request Queue-]
* |
* |
* request supporting
* rate limiting
* |
* |
* ------------> {Rate Limit Group 1 -> Queue 1, ..., Rate Limit Group N -> Queue N}
*
* Explanation: Submit request to the queue for the specific rate limiting group.
* The rate limiting groups are polled at the same specified interval,
* which in the worst cases introduces an additional latency of
* {@link RequestExecutorServiceSettings#getTaskPollFrequency()}.
*/
public class RequestExecutorService implements RequestExecutor {

Expand Down Expand Up @@ -109,6 +134,8 @@ interface RateLimiterCreator {
private final RateLimiterCreator rateLimiterCreator;
private final AtomicReference<Scheduler.Cancellable> cancellableCleanupTask = new AtomicReference<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final AdjustableCapacityBlockingQueue<RejectableTask> requestQueue;
private volatile Future<?> requestQueueTask;

public RequestExecutorService(
ThreadPool threadPool,
Expand All @@ -135,10 +162,16 @@ public RequestExecutorService(
this.settings = Objects.requireNonNull(settings);
this.clock = Objects.requireNonNull(clock);
this.rateLimiterCreator = Objects.requireNonNull(rateLimiterCreator);
this.requestQueue = new AdjustableCapacityBlockingQueue<>(queueCreator, settings.getQueueCapacity());
}

public void shutdown() {
if (shutdown.compareAndSet(false, true)) {
if (requestQueueTask != null) {
boolean cancelled = FutureUtils.cancel(requestQueueTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I think it's up to our implementation to check if it is canceled. So I think we'll get stuck in the queue.take() 🤔

It doesn't seem like FutureUtils.cancel() will do an interrupt.

This is how we've handled that in the past: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L253

The shutdown() method puts a noop task on the queue to ensure that it wakes up.

Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted with Add NoopTask to wake up queue on shutdown

Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?

AFAIU we always check that when calling submitShutdownRequest, right?

logger.debug(() -> format("Request queue cancellation successful: %s", cancelled));
}

if (cancellableCleanupTask.get() != null) {
logger.debug(() -> "Stopping clean up thread");
cancellableCleanupTask.get().cancel();
Expand All @@ -159,7 +192,7 @@ public boolean isTerminated() {
}

public int queueSize() {
return rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum();
return requestQueue.size() + rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum();
}

/**
Expand All @@ -175,8 +208,8 @@ public void start() {

startCleanupTask();
signalStartInitiated();

handleTasks();
startRequestQueueTask();
startHandlingRateLimitedTasks();
} catch (Exception e) {
logger.warn("Failed to start request executor", e);
cleanup();
Copy link
Contributor

@jonathan-buttner jonathan-buttner Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a small potential for an edge case here (if we go the noop task route to do a shutdown for the queue.take()). If an exception occurs in startHandlingRateLimitedTasks(), it could cause the requestQueue to be drained which could mean that it'd never get the noop task.

I'd have to think of a good way to solve that. Maybe we split up the cleanup methods so that this one doesn't drain the requestQueue, instead the processRequestQueue() would call a different cleanup() that'd handle doing that 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand All @@ -194,6 +227,11 @@ private void startCleanupTask() {
cancellableCleanupTask.set(startCleanupThread(RATE_LIMIT_GROUP_CLEANUP_INTERVAL));
}

private void startRequestQueueTask() {
assert requestQueueTask == null : "The request queue can only be started once";
requestQueueTask = threadPool.executor(UTILITY_THREAD_POOL_NAME).submit(this::processRequestQueue);
}

private Scheduler.Cancellable startCleanupThread(TimeValue interval) {
logger.debug(() -> Strings.format("Clean up task scheduled with interval [%s]", interval));

Expand Down Expand Up @@ -221,7 +259,83 @@ private void scheduleNextHandleTasks(TimeValue timeToWait) {
return;
}

threadPool.schedule(this::handleTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME));
threadPool.schedule(this::startHandlingRateLimitedTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME));
}

private void processRequestQueue() {
try {
while (isShutdown() == false) {
// Blocks the request queue thread until a new request comes in
var task = (RequestTask) requestQueue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the cast?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to replace uses of RequestTask with RejectableTask in this class, since there's no reason that the interface can't be used throughout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if (isShutdown()) {
logger.debug("Shutdown requested while handling request tasks, cleaning up");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we move this to a finally block we probably don't need it here and we could probably remove the return.

return;
} else {
var requestManager = task.getRequestManager();

if (rateLimitingEnabled(requestManager)) {
submitTaskToRateLimitedExecutionPath(task);
} else {
executeTaskImmediately(task);
}
}
}
} catch (InterruptedException e) {
// Restore interrupt to propagate to the calling thread
Thread.currentThread().interrupt();
logger.debug("Inference request queue interrupted, exiting");
} catch (Exception e) {
logger.warn("Error processing task in inference request queue", e);
cleanup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this to a finally block to ensure it gets called.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

private void executeTaskImmediately(RequestTask task) {
try {
task.getRequestManager()
.execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener());
} catch (Exception e) {
logger.warn(
format("Failed to execute fast-path request for inference id [%s]", task.getRequestManager().inferenceEntityId()),
e
);

task.onRejection(
new EsRejectedExecutionException(
format("Failed to execute request for inference id [%s]", task.getRequestManager().inferenceEntityId()),
false
)
);
}
}

// visible for testing
void submitTaskToRateLimitedExecutionPath(RequestTask task) {
var requestManager = task.getRequestManager();
var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> {
var endpointHandler = new RateLimitingEndpointHandler(
Integer.toString(requestManager.rateLimitGrouping().hashCode()),
queueCreator,
settings,
requestSender,
clock,
requestManager.rateLimitSettings(),
this::isShutdown,
rateLimiterCreator,
rateLimitDivisor.get()
);

endpointHandler.init();
return endpointHandler;
});

endpoint.enqueue(task);
}

private boolean rateLimitingEnabled(RequestManager requestManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency how about we make this static and accept a RateLimitSettings object. Then we can use it in executeEnqueuedTaskInternal which has a similar check. Technically executeEnqueuedTaskInternal should never receive a non-rate limited task, but it'd probably be good to check just in case.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Adjusted with Reuse rateLimitSettingsEnabled check

return requestManager.rateLimitSettings() != null && requestManager.rateLimitSettings().isEnabled();
}

private void cleanup() {
Expand All @@ -234,12 +348,12 @@ private void cleanup() {
}
}

private void handleTasks() {
private void startHandlingRateLimitedTasks() {
try {
TimeValue timeToWait;
do {
if (shutdown.get()) {
logger.debug("Shutdown requested while handling tasks, cleaning up");
if (isShutdown()) {
logger.debug("Shutdown requested while handling rate limited tasks, cleaning up");
cleanup();
return;
}
Expand All @@ -253,17 +367,47 @@ private void handleTasks() {

scheduleNextHandleTasks(timeToWait);
} catch (Exception e) {
logger.warn("Encountered an error while handling tasks", e);
logger.warn("Encountered an error while handling rate limited tasks", e);
cleanup();
}
}

private void notifyRequestsOfShutdown() {
assert isShutdown() : "Requests should only be notified if the executor is shutting down";

// Reject rate-limited requests
for (var endpoint : rateLimitGroupings.values()) {
endpoint.notifyRequestsOfShutdown();
}

// Reject non-rate-limited requests
List<RejectableTask> requests = new ArrayList<>();
requestQueue.drainTo(requests);

for (var request : requests) {
rejectRequest(request);
}
}

private void rejectRequest(RejectableTask task) {
try {
task.onRejection(
new EsRejectedExecutionException(
format(
"Failed to send request for inference id [%s] has shutdown prior to executing request",
task.getRequestManager().inferenceEntityId()
),
true
)
);
} catch (Exception e) {
logger.warn(
format(
"Failed to notify request for inference id [%s] of rejection after executor service shutdown",
task.getRequestManager().inferenceEntityId()
)
);
}
Copy link
Contributor

@DonalEvans DonalEvans Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicated in RateLimitingEndpointHandler, with the only difference being the format of the error message. Would it be possible to extract it to a static method that both places could call? Rather than needing to use the id field directly for the service grouping, it can be derived from the task: Integer.toString(task.getRequestManager().rateLimitGrouping().hashCode())

There are also quite a few other places where we reject tasks due to shutdown that could be extracted to a method call to reduce code duplication and ensure consistency for the error messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the id (inferenceEntityId being the name of the inference endpoint AFAIU) and service grouping are two different things, so IMO we should keep id in the error messages, so it's clear, which endpoint failed to execute a request. I think the hashCode of the rateLimitGrouping could become quite cryptic for a clear log/error message.

But nothing speaks against extracting the common part to a static method 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, the id field in RateLimitingEndpointHandler is the service grouping hash code. The field is not well named in that respect.

}

// default for testing
Expand Down Expand Up @@ -308,26 +452,29 @@ public void execute(
ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext())
);

var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> {
var endpointHandler = new RateLimitingEndpointHandler(
Integer.toString(requestManager.rateLimitGrouping().hashCode()),
queueCreator,
settings,
requestSender,
clock,
requestManager.rateLimitSettings(),
this::isShutdown,
rateLimiterCreator,
rateLimitDivisor.get()
if (isShutdown()) {
task.onRejection(
new EsRejectedExecutionException(
format(
"Failed to enqueue request task for inference id [%s] because the request executor service has been shutdown",
requestManager.inferenceEntityId()
),
true
)
);
return;
}

// TODO: add or create/compute if absent set for new map (service/task-type-key -> rate limit endpoint handler)
boolean taskAccepted = requestQueue.offer(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, prior to this change, when execute() was called, we would retrieve (or create) a RateLimitingEndpointHandler appropriate to the task's rate limit settings, then add the task to the queue associated with that handler. This meant that there was one queue per RateLimitingEndpointHandler, each with a capacity defined by the RequestExecutorServiceSettings, and that we wouldn't reject a task unless the queue for the handler associated with it was full.

With the new change, all tasks are first submitted to the single requestQueue queue, which has the same capacity as each of the queues managed by the handlers, meaning that we can begin rejecting tasks even though none of the queues associated with the handlers are full, effectively reducing the total number of requests we can process in a given time.

Would it be better to instead only add tasks with no rate limit settings to the request queue, and call submitTaskToRateLimitedExecutionPath() in the execute() method for tasks with rate limit settings? That way, we're not adding rate limited tasks to one queue just to later remove them and add them to another queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very valid point @DonalEvans! I'll adjust the implementation, thanks for flagging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


endpointHandler.init();
return endpointHandler;
});

endpoint.enqueue(task);
if (taskAccepted == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task was accepted we'll want to check is shutdown one last time to ensure we notify this task that we're shutting down.

Here's an example: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L302

task.onRejection(
new EsRejectedExecutionException(
format("Failed to enqueue request task for inference id [%s]", requestManager.inferenceEntityId()),
false
)
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit) {
}

// This should only be used for testing.
RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) {
public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) {
if (requestsPerTimeUnit <= 0) {
throw new IllegalArgumentException("requests per minute must be positive");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,12 @@ public void testHttpRequestSender_Throws_WhenCallingSendBeforeStart() throws Exc
PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
var thrownException = expectThrows(
AssertionError.class,
() -> sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), null, listener)
() -> sender.send(
RequestManagerTests.createMockWithRateLimitingEnabled(),
new EmbeddingsInput(List.of(), null),
null,
listener
)
);
assertThat(thrownException.getMessage(), is("call start() before sending a request"));
}
Expand All @@ -375,7 +380,12 @@ public void testHttpRequestSender_Throws_WhenATimeoutOccurs() throws Exception {
sender.startSynchronously();

PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), TimeValue.timeValueNanos(1), listener);
sender.send(
RequestManagerTests.createMockWithRateLimitingEnabled(),
new EmbeddingsInput(List.of(), null),
TimeValue.timeValueNanos(1),
listener
);

var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT));

Expand All @@ -397,7 +407,12 @@ public void testHttpRequestSenderWithTimeout_Throws_WhenATimeoutOccurs() throws
sender.startSynchronously();

PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
sender.send(RequestManagerTests.createMock(), new EmbeddingsInput(List.of(), null), TimeValue.timeValueNanos(1), listener);
sender.send(
RequestManagerTests.createMockWithRateLimitingEnabled(),
new EmbeddingsInput(List.of(), null),
TimeValue.timeValueNanos(1),
listener
);

var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT));

Expand Down
Loading