Skip to content

Commit 3a8422d

Browse files
committed
Use a bounded size queue for inference tasks.
1 parent 340c189 commit 3a8422d

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
12-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1312
import org.elasticsearch.threadpool.ThreadPool;
1413
import org.elasticsearch.xpack.core.inference.action.InferenceAction;
1514
import org.elasticsearch.xpack.esql.inference.InferenceRunner;
1615
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
1716

1817
import java.util.ArrayList;
1918
import java.util.List;
19+
import java.util.concurrent.ArrayBlockingQueue;
2020
import java.util.concurrent.BlockingQueue;
2121
import java.util.concurrent.ExecutorService;
2222
import java.util.concurrent.Semaphore;
@@ -114,13 +114,14 @@ private void sendResponseOnCompletion() {
114114
private static class ThrottledInferenceRunner {
115115
private final InferenceRunner inferenceRunner;
116116
private final ExecutorService executorService;
117-
private final BlockingQueue<AbstractRunnable> pendingRequests = ConcurrentCollections.newBlockingQueue();
117+
private final BlockingQueue<AbstractRunnable> pendingRequestsQueue;
118118
private final Semaphore permits;
119119

120120
private ThrottledInferenceRunner(InferenceRunner inferenceRunner, ExecutorService executorService, int maxRunningTasks) {
121121
this.executorService = executorService;
122122
this.permits = new Semaphore(maxRunningTasks);
123123
this.inferenceRunner = inferenceRunner;
124+
this.pendingRequestsQueue = new ArrayBlockingQueue<>(maxRunningTasks, true);
124125
}
125126

126127
public static ThrottledInferenceRunner create(
@@ -138,7 +139,7 @@ public void doInference(InferenceAction.Request request, ActionListener<Inferenc
138139

139140
private void executePendingRequests() {
140141
while (permits.tryAcquire()) {
141-
AbstractRunnable task = pendingRequests.poll();
142+
AbstractRunnable task = pendingRequestsQueue.poll();
142143

143144
if (task == null) {
144145
permits.release();
@@ -156,7 +157,7 @@ private void executePendingRequests() {
156157

157158
private void enqueueTask(InferenceAction.Request request, ActionListener<InferenceAction.Response> listener) {
158159
try {
159-
pendingRequests.add(createTask(request, listener));
160+
pendingRequestsQueue.put(createTask(request, listener));
160161
executePendingRequests();
161162
} catch (Exception e) {
162163
listener.onFailure(new IllegalStateException("An error occurred while adding the inference request to the queue", e));

0 commit comments

Comments
 (0)