Skip to content

Commit bcd6c1d

Browse files
Fix race in AbstractSearchAsyncAction request throttling (#116264)
We had a race here where the non-blocking pending execution would be starved of executing threads. This happened when all the current holders of permits from the semaphore would release their permit after a producer thread failed to acquire a permit and then enqueued its task. => need to peek the queue again after releasing the permit and try to acquire a new permit if there's work left to be done to avoid this scenario.
1 parent 3b2021f commit bcd6c1d

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,6 @@ tests:
237237
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
238238
method: testSearchWithRandomDisconnects
239239
issue: https://github.com/elastic/elasticsearch/issues/116175
240-
- class: org.elasticsearch.indexing.IndexActionIT
241-
method: testAutoGenerateIdNoDuplicates
242-
issue: https://github.com/elastic/elasticsearch/issues/115716
243240
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
244241
method: test {p0=ml/start_stop_datafeed/Test start datafeed given index pattern with no matching indices}
245242
issue: https://github.com/elastic/elasticsearch/issues/116220

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
import java.util.List;
4949
import java.util.Map;
5050
import java.util.concurrent.ConcurrentHashMap;
51+
import java.util.concurrent.ConcurrentLinkedQueue;
5152
import java.util.concurrent.Executor;
52-
import java.util.concurrent.LinkedTransferQueue;
5353
import java.util.concurrent.Semaphore;
5454
import java.util.concurrent.atomic.AtomicBoolean;
5555
import java.util.concurrent.atomic.AtomicInteger;
@@ -751,7 +751,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
751751

752752
private static final class PendingExecutions {
753753
private final Semaphore semaphore;
754-
private final LinkedTransferQueue<Consumer<Releasable>> queue = new LinkedTransferQueue<>();
754+
private final ConcurrentLinkedQueue<Consumer<Releasable>> queue = new ConcurrentLinkedQueue<>();
755755

756756
PendingExecutions(int permits) {
757757
assert permits > 0 : "not enough permits: " + permits;
@@ -770,11 +770,10 @@ void submit(Consumer<Releasable> task) {
770770
}
771771
}
772772
}
773-
774773
}
775774

776775
private void executeAndRelease(Consumer<Releasable> task) {
777-
while (task != null) {
776+
do {
778777
final SubscribableListener<Void> onDone = new SubscribableListener<>();
779778
task.accept(() -> onDone.onResponse(null));
780779
if (onDone.isDone()) {
@@ -797,13 +796,21 @@ public void onFailure(Exception e) {
797796
});
798797
return;
799798
}
800-
}
799+
} while (task != null);
801800
}
802801

803802
private Consumer<Releasable> pollNextTaskOrReleasePermit() {
804803
var task = queue.poll();
805804
if (task == null) {
806805
semaphore.release();
806+
while (queue.peek() != null && semaphore.tryAcquire()) {
807+
task = queue.poll();
808+
if (task == null) {
809+
semaphore.release();
810+
} else {
811+
return task;
812+
}
813+
}
807814
}
808815
return task;
809816
}

0 commit comments

Comments
 (0)