Skip to content

Commit 99d9d99

Browse files
Fix race in AbstractSearchAsyncAction request throttling (#116264) (#117426)
We had a race here where the non-blocking pending execution would be starved of executing threads. This happened when all the current holders of permits from the semaphore would release their permit after a producer thread failed to acquire a permit and then enqueued its task. => need to peek the queue again after releasing the permit and try to acquire a new permit if there's work left to be done to avoid this scenario.
1 parent 7953516 commit 99d9d99

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.ConcurrentLinkedQueue;
5253
import java.util.concurrent.Executor;
53-
import java.util.concurrent.LinkedTransferQueue;
5454
import java.util.concurrent.Semaphore;
5555
import java.util.concurrent.atomic.AtomicBoolean;
5656
import java.util.concurrent.atomic.AtomicInteger;
@@ -830,7 +830,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
830830

831831
private static final class PendingExecutions {
832832
private final Semaphore semaphore;
833-
private final LinkedTransferQueue<Consumer<Releasable>> queue = new LinkedTransferQueue<>();
833+
private final ConcurrentLinkedQueue<Consumer<Releasable>> queue = new ConcurrentLinkedQueue<>();
834834

835835
PendingExecutions(int permits) {
836836
assert permits > 0 : "not enough permits: " + permits;
@@ -849,11 +849,10 @@ void submit(Consumer<Releasable> task) {
849849
}
850850
}
851851
}
852-
853852
}
854853

855854
private void executeAndRelease(Consumer<Releasable> task) {
856-
while (task != null) {
855+
do {
857856
final SubscribableListener<Void> onDone = new SubscribableListener<>();
858857
task.accept(() -> onDone.onResponse(null));
859858
if (onDone.isDone()) {
@@ -876,13 +875,21 @@ public void onFailure(Exception e) {
876875
});
877876
return;
878877
}
879-
}
878+
} while (task != null);
880879
}
881880

882881
private Consumer<Releasable> pollNextTaskOrReleasePermit() {
883882
var task = queue.poll();
884883
if (task == null) {
885884
semaphore.release();
885+
while (queue.peek() != null && semaphore.tryAcquire()) {
886+
task = queue.poll();
887+
if (task == null) {
888+
semaphore.release();
889+
} else {
890+
return task;
891+
}
892+
}
886893
}
887894
return task;
888895
}

0 commit comments

Comments
 (0)