Skip to content

Commit 205675d

Browse files
Fix race in AbstractSearchAsyncAction request throttling (#116264) (#117638)
We had a race here where the non-blocking pending execution would be starved of executing threads. This happened when all the current holders of permits from the semaphore would release their permit after a producer thread failed to acquire a permit and then enqueued its task. => need to peek the queue again after releasing the permit and try to acquire a new permit if there's work left to be done to avoid this scenario.
1 parent 74c760f commit 205675d

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.ConcurrentLinkedQueue;
5253
import java.util.concurrent.Executor;
53-
import java.util.concurrent.LinkedTransferQueue;
5454
import java.util.concurrent.Semaphore;
5555
import java.util.concurrent.atomic.AtomicBoolean;
5656
import java.util.concurrent.atomic.AtomicInteger;
@@ -792,7 +792,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
792792

793793
private static final class PendingExecutions {
794794
private final Semaphore semaphore;
795-
private final LinkedTransferQueue<Consumer<Releasable>> queue = new LinkedTransferQueue<>();
795+
private final ConcurrentLinkedQueue<Consumer<Releasable>> queue = new ConcurrentLinkedQueue<>();
796796

797797
PendingExecutions(int permits) {
798798
assert permits > 0 : "not enough permits: " + permits;
@@ -811,11 +811,10 @@ void submit(Consumer<Releasable> task) {
811811
}
812812
}
813813
}
814-
815814
}
816815

817816
private void executeAndRelease(Consumer<Releasable> task) {
818-
while (task != null) {
817+
do {
819818
final SubscribableListener<Void> onDone = new SubscribableListener<>();
820819
task.accept(() -> onDone.onResponse(null));
821820
if (onDone.isDone()) {
@@ -838,13 +837,21 @@ public void onFailure(Exception e) {
838837
});
839838
return;
840839
}
841-
}
840+
} while (task != null);
842841
}
843842

844843
private Consumer<Releasable> pollNextTaskOrReleasePermit() {
845844
var task = queue.poll();
846845
if (task == null) {
847846
semaphore.release();
847+
while (queue.peek() != null && semaphore.tryAcquire()) {
848+
task = queue.poll();
849+
if (task == null) {
850+
semaphore.release();
851+
} else {
852+
return task;
853+
}
854+
}
848855
}
849856
return task;
850857
}

0 commit comments

Comments
 (0)