Skip to content

Commit ff776e4

Browse files
authored
Force execution of SearchService.Reaper (#106555)
If the search threadpool fills up then we may reject execution of `SearchService.Reaper` which means it stops retrying. We must instead force its execution so that it keeps on going. With #106542, closes #106543 Backport of #106544 to 7.17
1 parent a86aacd commit ff776e4

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

docs/changelog/106544.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 106544
2+
summary: Force execution of `SearchService.Reaper`
3+
area: Search
4+
type: bug
5+
issues:
6+
- 106543

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
import org.elasticsearch.common.unit.ByteSizeValue;
4040
import org.elasticsearch.common.util.BigArrays;
4141
import org.elasticsearch.common.util.CollectionUtils;
42+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4243
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
44+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4345
import org.elasticsearch.core.Releasable;
4446
import org.elasticsearch.core.Releasables;
4547
import org.elasticsearch.core.TimeValue;
@@ -1434,9 +1436,9 @@ public ResponseCollectorService getResponseCollectorService() {
14341436
return this.responseCollectorService;
14351437
}
14361438

1437-
class Reaper implements Runnable {
1439+
class Reaper extends AbstractRunnable {
14381440
@Override
1439-
public void run() {
1441+
protected void doRun() {
14401442
assert Transports.assertNotTransportThread("closing contexts may do IO, e.g. deleting dangling files")
14411443
&& ThreadPool.assertNotScheduleThread("closing contexts may do IO, e.g. deleting dangling files");
14421444
for (ReaderContext context : activeReaders.values()) {
@@ -1446,6 +1448,27 @@ public void run() {
14461448
}
14471449
}
14481450
}
1451+
1452+
@Override
1453+
public void onFailure(Exception e) {
1454+
logger.error("unexpected error when freeing search contexts", e);
1455+
assert false : e;
1456+
}
1457+
1458+
@Override
1459+
public void onRejection(Exception e) {
1460+
if (e instanceof EsRejectedExecutionException && ((EsRejectedExecutionException) e).isExecutorShutdown()) {
1461+
logger.debug("rejected execution when freeing search contexts");
1462+
} else {
1463+
onFailure(e);
1464+
}
1465+
}
1466+
1467+
@Override
1468+
public boolean isForceExecution() {
1469+
// mustn't reject this task even if the queue is full
1470+
return true;
1471+
}
14491472
}
14501473

14511474
public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String> resolvedExpressions) {

0 commit comments

Comments
 (0)