Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -449,27 +449,23 @@ public synchronized TimeValue executeEnqueuedTask() {
}

private TimeValue executeEnqueuedTaskInternal() {
var timeBeforeAvailableToken = rateLimiter.timeToReserve(1);
if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) {
return timeBeforeAvailableToken;
}

var task = queue.poll();

// TODO Batching - in a situation where no new tasks are queued we'll want to execute any prepared tasks
// So we'll need to check for null and call a helper method executePreparedTasks()

if (shouldExecuteTask(task) == false) {
return NO_TASKS_AVAILABLE;
logger.warn(
"not executing task [{}] because it is null or has already completed",
task == null ? "null" : task.getRequestManager().inferenceEntityId()
);
return TimeValue.ZERO;
}

// We should never have to wait because we checked above
var reserveRes = rateLimiter.reserve(1);
assert shouldExecuteImmediately(reserveRes) : "Reserving request tokens required a sleep when it should not have";

task.getRequestManager()
.execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener());
return EXECUTED_A_TASK;
return TimeValue.ZERO;
}

private static boolean shouldExecuteTask(RejectableTask task) {
Expand Down