Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f1e7419
Remove worst-case additional 50ms latency for non-rate limited requests
timgrein Oct 8, 2025
a326e31
Update docs/changelog/136167.yaml
timgrein Oct 8, 2025
191029f
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
edaa0f0
Do not use forbidden API
timgrein Oct 8, 2025
57c5605
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
f98b82e
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
f17ec00
Move startRequestQueueTask before start signal
timgrein Oct 8, 2025
90ee1a1
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
a9e7610
Cleanup in finally block
timgrein Oct 8, 2025
ec513be
Reject request on shutdown
timgrein Oct 8, 2025
174526c
Reuse rateLimitSettingsEnabled check
timgrein Oct 8, 2025
f506cb3
Add NoopTask to wake up queue on shutdown
timgrein Oct 9, 2025
ae349fd
Only add non-rate-limited tasks to fast-path request queue
timgrein Oct 9, 2025
540f49d
Extract rejection logic to common static method
timgrein Oct 9, 2025
0dca88a
Remove unnecessary cast
timgrein Oct 10, 2025
0590561
Use string placeholder in assertion
timgrein Oct 10, 2025
2e65475
Adjust test to check that a throwing task does not terminate the service
timgrein Oct 10, 2025
4fb2372
Adjust error message in general exception handler
timgrein Oct 10, 2025
2930151
Adjust warn to error
timgrein Oct 10, 2025
b2fd85f
Adjust error message when request gets rejected
timgrein Oct 13, 2025
91f387a
Rename id in RateLimitingEndpointHandler to rateLimitGroupingId
timgrein Oct 13, 2025
90e672b
Use Strings.format(...) in assertion
timgrein Oct 13, 2025
1cf24dc
Use thenAnswer instead of suppression
timgrein Oct 13, 2025
a92a7c0
Only reject requests of the respective execution path (rate-limited v…
timgrein Oct 13, 2025
8e52c22
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 14, 2025
a868152
Submit only ingest embeddings requests to rate-limited execution path
timgrein Oct 14, 2025
dd53fcb
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136167.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136167
summary: "[Inference API] Remove worst-case additional 50ms latency for non-rate limited\
\ requests"
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -53,6 +54,29 @@
* {@link org.apache.http.client.methods.HttpUriRequest} to set a timeout for how long this executor will wait
* attempting to execute a task (aka waiting for the connection manager to lease a connection). See
* {@link org.apache.http.client.config.RequestConfig.Builder#setConnectionRequestTimeout} for more info.
*
* The request flow looks as follows:
*
* -------------> Add request to fast-path request queue.
* |
* |
* request NOT supporting
* rate limiting
* |
* |
* Request ------------|
* |
* |
* request supporting
* rate limiting
* |
* |
* ------------> {Rate Limit Group 1 -> Queue 1, ..., Rate Limit Group N -> Queue N}
*
* Explanation: Submit request to the queue for the specific rate limiting group.
* The rate limiting groups are polled at the same specified interval,
* which in the worst cases introduces an additional latency of
* {@link RequestExecutorServiceSettings#getTaskPollFrequency()}.
*/
public class RequestExecutorService implements RequestExecutor {

Expand Down Expand Up @@ -109,6 +133,8 @@ interface RateLimiterCreator {
private final RateLimiterCreator rateLimiterCreator;
private final AtomicReference<Scheduler.Cancellable> cancellableCleanupTask = new AtomicReference<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final AdjustableCapacityBlockingQueue<RejectableTask> requestQueue;
private volatile Future<?> requestQueueTask;

public RequestExecutorService(
ThreadPool threadPool,
Expand All @@ -135,10 +161,16 @@ public RequestExecutorService(
this.settings = Objects.requireNonNull(settings);
this.clock = Objects.requireNonNull(clock);
this.rateLimiterCreator = Objects.requireNonNull(rateLimiterCreator);
this.requestQueue = new AdjustableCapacityBlockingQueue<>(queueCreator, settings.getQueueCapacity());
}

public void shutdown() {
if (shutdown.compareAndSet(false, true)) {
if (requestQueueTask != null) {
// Wakes up the queue in processRequestQueue
requestQueue.offer(NoopTask);
}

if (cancellableCleanupTask.get() != null) {
logger.debug(() -> "Stopping clean up thread");
cancellableCleanupTask.get().cancel();
Expand All @@ -159,7 +191,7 @@ public boolean isTerminated() {
}

public int queueSize() {
return rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum();
return requestQueue.size() + rateLimitGroupings.values().stream().mapToInt(RateLimitingEndpointHandler::queueSize).sum();
}

/**
Expand All @@ -174,9 +206,9 @@ public void start() {
started.set(true);

startCleanupTask();
startRequestQueueTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this above the signalStartInitiated()? That way all the threading stuff is done prior to the signal start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

signalStartInitiated();

handleTasks();
startHandlingRateLimitedTasks();
} catch (Exception e) {
logger.warn("Failed to start request executor", e);
cleanup();
Copy link
Contributor

@jonathan-buttner jonathan-buttner Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a small potential for an edge case here (if we go the noop task route to do a shutdown for the queue.take()). If an exception occurs in startHandlingRateLimitedTasks(), it could cause the requestQueue to be drained which could mean that it'd never get the noop task.

I'd have to think of a good way to solve that. Maybe we split up the cleanup methods so that this one doesn't drain the requestQueue, instead the processRequestQueue() would call a different cleanup() that'd handle doing that 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand All @@ -194,6 +226,11 @@ private void startCleanupTask() {
cancellableCleanupTask.set(startCleanupThread(RATE_LIMIT_GROUP_CLEANUP_INTERVAL));
}

private void startRequestQueueTask() {
assert requestQueueTask == null : "The request queue can only be started once";
requestQueueTask = threadPool.executor(UTILITY_THREAD_POOL_NAME).submit(this::processRequestQueue);
}

private Scheduler.Cancellable startCleanupThread(TimeValue interval) {
logger.debug(() -> Strings.format("Clean up task scheduled with interval [%s]", interval));

Expand Down Expand Up @@ -221,7 +258,86 @@ private void scheduleNextHandleTasks(TimeValue timeToWait) {
return;
}

threadPool.schedule(this::handleTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME));
threadPool.schedule(this::startHandlingRateLimitedTasks, timeToWait, threadPool.executor(UTILITY_THREAD_POOL_NAME));
}

private void processRequestQueue() {
try {
while (isShutdown() == false) {
var task = requestQueue.take();

if (task == NoopTask) {
if (isShutdown()) {
logger.debug("Shutdown requested, exiting request queue processing");
break;
}

// Skip processing NoopTask
continue;
}

if (isShutdown()) {
logger.debug("Shutdown requested while handling request tasks, cleaning up");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejectNonRateLimitedRequest(task);
break;
}

executeTaskImmediately(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("Inference request queue interrupted, exiting");
} catch (Exception e) {
logger.error("Unexpected error processing request queue, terminating", e);
} finally {
cleanup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this to a finally block to ensure it gets called.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

private void executeTaskImmediately(RejectableTask task) {
try {
task.getRequestManager()
.execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener());
} catch (Exception e) {
logger.warn(
format("Failed to execute fast-path request for inference id [%s]", task.getRequestManager().inferenceEntityId()),
e
);

task.onRejection(
new EsRejectedExecutionException(
format("Failed to execute request for inference id [%s]", task.getRequestManager().inferenceEntityId()),
false
)
);
}
}

// visible for testing
void submitTaskToRateLimitedExecutionPath(RequestTask task) {
var requestManager = task.getRequestManager();
var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> {
var endpointHandler = new RateLimitingEndpointHandler(
Integer.toString(requestManager.rateLimitGrouping().hashCode()),
queueCreator,
settings,
requestSender,
clock,
requestManager.rateLimitSettings(),
this::isShutdown,
rateLimiterCreator,
rateLimitDivisor.get()
);

endpointHandler.init();
return endpointHandler;
});

endpoint.enqueue(task);
}

private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) {
return rateLimitSettings != null && rateLimitSettings.isEnabled();
}

private void cleanup() {
Expand All @@ -234,12 +350,12 @@ private void cleanup() {
}
}

private void handleTasks() {
private void startHandlingRateLimitedTasks() {
try {
TimeValue timeToWait;
do {
if (shutdown.get()) {
logger.debug("Shutdown requested while handling tasks, cleaning up");
if (isShutdown()) {
logger.debug("Shutdown requested while handling rate limited tasks, cleaning up");
cleanup();
return;
}
Expand All @@ -253,17 +369,44 @@ private void handleTasks() {

scheduleNextHandleTasks(timeToWait);
} catch (Exception e) {
logger.warn("Encountered an error while handling tasks", e);
logger.warn("Encountered an error while handling rate limited tasks", e);
cleanup();
}
}

private void notifyRequestsOfShutdown() {
assert isShutdown() : "Requests should only be notified if the executor is shutting down";

// Reject rate-limited requests
for (var endpoint : rateLimitGroupings.values()) {
endpoint.notifyRequestsOfShutdown();
}

// Reject non-rate-limited requests
List<RejectableTask> requests = new ArrayList<>();
requestQueue.drainTo(requests);

for (var request : requests) {
rejectNonRateLimitedRequest(request);
}
}

private void rejectNonRateLimitedRequest(RejectableTask task) {
var inferenceEntityId = task.getRequestManager().inferenceEntityId();

rejectRequest(
task,
format("Failed to send request for inference id [%s] has shutdown prior to executing request", inferenceEntityId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message might be better as "Failed to send request for inference id [%s] because the request executor service has been shutdown" to make it consistent with the error we report in execute() if we try to queue a task when we're shut down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format("Failed to notify request for inference id [%s] of rejection after executor service shutdown", inferenceEntityId)
);
}

private static void rejectRequest(RejectableTask task, String rejectionMessage, String rejectionFailedMessage) {
try {
task.onRejection(new EsRejectedExecutionException(rejectionMessage, true));
} catch (Exception e) {
logger.warn(rejectionFailedMessage);
}
}

// default for testing
Expand Down Expand Up @@ -308,26 +451,33 @@ public void execute(
ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext())
);

var endpoint = rateLimitGroupings.computeIfAbsent(requestManager.rateLimitGrouping(), key -> {
var endpointHandler = new RateLimitingEndpointHandler(
Integer.toString(requestManager.rateLimitGrouping().hashCode()),
queueCreator,
settings,
requestSender,
clock,
requestManager.rateLimitSettings(),
this::isShutdown,
rateLimiterCreator,
rateLimitDivisor.get()
if (isShutdown()) {
task.onRejection(
new EsRejectedExecutionException(
format(
"Failed to enqueue request task for inference id [%s] because the request executor service has been shutdown",
requestManager.inferenceEntityId()
),
true
)
);
return;
}

// TODO: add or create/compute if absent set for new map (service/task-type-key -> rate limit endpoint handler)

endpointHandler.init();
return endpointHandler;
});
if (rateLimitingEnabled(requestManager.rateLimitSettings())) {
submitTaskToRateLimitedExecutionPath(task);
} else {
boolean taskAccepted = requestQueue.offer(task);

endpoint.enqueue(task);
if (taskAccepted == false) {
task.onRejection(
new EsRejectedExecutionException(
format("Failed to enqueue request task for inference id [%s]", requestManager.inferenceEntityId()),
false
)
);
}
}
}

/**
Expand Down Expand Up @@ -423,7 +573,7 @@ public synchronized TimeValue executeEnqueuedTask() {
}

private TimeValue executeEnqueuedTaskInternal() {
if (rateLimitSettings.isEnabled()) {
if (rateLimitingEnabled(rateLimitSettings)) {
var timeBeforeAvailableToken = rateLimiter.timeToReserve(1);
if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) {
return timeBeforeAvailableToken;
Expand Down Expand Up @@ -514,27 +664,18 @@ public synchronized void notifyRequestsOfShutdown() {

private void rejectTasks(List<RejectableTask> tasks) {
for (var task : tasks) {
rejectTaskForShutdown(task);
}
}
var inferenceEntityId = task.getRequestManager().inferenceEntityId();

private void rejectTaskForShutdown(RejectableTask task) {
try {
task.onRejection(
new EsRejectedExecutionException(
format(
"Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request",
id,
task.getRequestManager().inferenceEntityId()
),
true
)
);
} catch (Exception e) {
logger.warn(
rejectRequest(
task,
format(
"Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request",
id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related to this PR, but since you're making changes in this class, could you rename the id field on RateLimitingEndpointHandler to be something more descriptive, like rateLimitGroupingId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferenceEntityId
),
format(
"Failed to notify request for inference id [%s] of rejection after executor service grouping [%s] shutdown",
task.getRequestManager().inferenceEntityId(),
inferenceEntityId,
id
)
);
Expand All @@ -549,4 +690,37 @@ public void close() {
requestExecutorServiceSettings.deregisterQueueCapacityCallback(id);
}
}

private static final RejectableTask NoopTask = new RejectableTask() {
@Override
public void onRejection(Exception e) {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}

@Override
public RequestManager getRequestManager() {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}

@Override
public InferenceInputs getInferenceInputs() {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}

@Override
public ActionListener<InferenceServiceResults> getListener() {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}

@Override
public boolean hasCompleted() {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}

@Override
public Supplier<Boolean> getRequestCompletedFunction() {
throw new UnsupportedOperationException("NoopTask is a pure marker class for signals in the request queue");
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit) {
}

// This should only be used for testing.
RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) {
public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) {
if (requestsPerTimeUnit <= 0) {
throw new IllegalArgumentException("requests per minute must be positive");
}
Expand Down
Loading