Skip to content

Commit 730f42a

Browse files
authored
Handle all exceptions in data nodes can match (#117469)
During the can match phase, prior to the query phase, we may have exceptions that are returned back to the coordinating node, handled gracefully as if the shard returned canMatch=true. During the query phase, we perform an additional rewrite and can match phase to eventually shortcut the query phase for the shard. That needs to handle exceptions as well. Currently, an exception there causes shard failures, while we should rather go ahead and execute the query on the shard. Instead of adding another try catch on consumers code, this commit adds exception handling to the method itself so that it can no longer throw exceptions and similar mistakes can no longer be made in the future. At the same time, this commit makes the can match method more easily testable without requiring a full-blown SearchService instance. Closes #104994
1 parent 47be542 commit 730f42a

File tree

9 files changed

+3524
-3021
lines changed

9 files changed

+3524
-3021
lines changed

docs/changelog/117469.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 117469
2+
summary: Handle exceptions in query phase can match
3+
area: Search
4+
type: bug
5+
issues:
6+
- 104994

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 126 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@
147147
import java.util.concurrent.atomic.AtomicBoolean;
148148
import java.util.concurrent.atomic.AtomicInteger;
149149
import java.util.concurrent.atomic.AtomicLong;
150+
import java.util.function.BiFunction;
151+
import java.util.function.Function;
150152
import java.util.function.LongSupplier;
151153
import java.util.function.Supplier;
152154

@@ -549,16 +551,17 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task,
549551
// check if we can shortcut the query phase entirely.
550552
if (orig.canReturnNullResponseIfMatchNoDocs()) {
551553
assert orig.scroll() == null;
552-
final CanMatchShardResponse canMatchResp;
553-
try {
554-
ShardSearchRequest clone = new ShardSearchRequest(orig);
555-
canMatchResp = canMatch(clone, false);
556-
} catch (Exception exc) {
557-
l.onFailure(exc);
558-
return;
559-
}
554+
ShardSearchRequest clone = new ShardSearchRequest(orig);
555+
CanMatchContext canMatchContext = new CanMatchContext(
556+
clone,
557+
indicesService::indexServiceSafe,
558+
this::findReaderContext,
559+
defaultKeepAlive,
560+
maxKeepAlive
561+
);
562+
CanMatchShardResponse canMatchResp = canMatch(canMatchContext, false);
560563
if (canMatchResp.canMatch() == false) {
561-
l.onResponse(QuerySearchResult.nullInstance());
564+
listener.onResponse(QuerySearchResult.nullInstance());
562565
return;
563566
}
564567
}
@@ -1191,25 +1194,37 @@ public void freeAllScrollContexts() {
11911194
}
11921195

11931196
private long getKeepAlive(ShardSearchRequest request) {
1197+
return getKeepAlive(request, defaultKeepAlive, maxKeepAlive);
1198+
}
1199+
1200+
private static long getKeepAlive(ShardSearchRequest request, long defaultKeepAlive, long maxKeepAlive) {
11941201
if (request.scroll() != null) {
1195-
return getScrollKeepAlive(request.scroll());
1202+
return getScrollKeepAlive(request.scroll(), defaultKeepAlive, maxKeepAlive);
11961203
} else if (request.keepAlive() != null) {
1197-
checkKeepAliveLimit(request.keepAlive().millis());
1204+
checkKeepAliveLimit(request.keepAlive().millis(), maxKeepAlive);
11981205
return request.keepAlive().getMillis();
11991206
} else {
12001207
return request.readerId() == null ? defaultKeepAlive : -1;
12011208
}
12021209
}
12031210

12041211
private long getScrollKeepAlive(Scroll scroll) {
1212+
return getScrollKeepAlive(scroll, defaultKeepAlive, maxKeepAlive);
1213+
}
1214+
1215+
private static long getScrollKeepAlive(Scroll scroll, long defaultKeepAlive, long maxKeepAlive) {
12051216
if (scroll != null && scroll.keepAlive() != null) {
1206-
checkKeepAliveLimit(scroll.keepAlive().millis());
1217+
checkKeepAliveLimit(scroll.keepAlive().millis(), maxKeepAlive);
12071218
return scroll.keepAlive().getMillis();
12081219
}
12091220
return defaultKeepAlive;
12101221
}
12111222

12121223
private void checkKeepAliveLimit(long keepAlive) {
1224+
checkKeepAliveLimit(keepAlive, maxKeepAlive);
1225+
}
1226+
1227+
private static void checkKeepAliveLimit(long keepAlive, long maxKeepAlive) {
12131228
if (keepAlive > maxKeepAlive) {
12141229
throw new IllegalArgumentException(
12151230
"Keep alive for request ("
@@ -1620,6 +1635,7 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
16201635
final List<CanMatchNodeResponse.ResponseOrFailure> responses = new ArrayList<>(shardLevelRequests.size());
16211636
for (var shardLevelRequest : shardLevelRequests) {
16221637
try {
1638+
// TODO remove the exception handling as it's now in canMatch itself
16231639
responses.add(new CanMatchNodeResponse.ResponseOrFailure(canMatch(request.createShardSearchRequest(shardLevelRequest))));
16241640
} catch (Exception e) {
16251641
responses.add(new CanMatchNodeResponse.ResponseOrFailure(e));
@@ -1631,82 +1647,145 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
16311647
/**
16321648
* This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query
16331649
* to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query
1634-
* won't match any documents on the current shard.
1650+
* won't match any documents on the current shard. Exceptions are handled within the method, and never re-thrown.
16351651
*/
1636-
public CanMatchShardResponse canMatch(ShardSearchRequest request) throws IOException {
1637-
return canMatch(request, true);
1652+
public CanMatchShardResponse canMatch(ShardSearchRequest request) {
1653+
CanMatchContext canMatchContext = new CanMatchContext(
1654+
request,
1655+
indicesService::indexServiceSafe,
1656+
this::findReaderContext,
1657+
defaultKeepAlive,
1658+
maxKeepAlive
1659+
);
1660+
return canMatch(canMatchContext, true);
16381661
}
16391662

1640-
private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException {
1641-
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
1663+
static class CanMatchContext {
1664+
private final ShardSearchRequest request;
1665+
private final Function<Index, IndexService> indexServiceLookup;
1666+
private final BiFunction<ShardSearchContextId, TransportRequest, ReaderContext> findReaderContext;
1667+
private final long defaultKeepAlive;
1668+
private final long maxKeepAlive;
1669+
1670+
private IndexService indexService;
1671+
1672+
CanMatchContext(
1673+
ShardSearchRequest request,
1674+
Function<Index, IndexService> indexServiceLookup,
1675+
BiFunction<ShardSearchContextId, TransportRequest, ReaderContext> findReaderContext,
1676+
long defaultKeepAlive,
1677+
long maxKeepAlive
1678+
) {
1679+
this.request = request;
1680+
this.indexServiceLookup = indexServiceLookup;
1681+
this.findReaderContext = findReaderContext;
1682+
this.defaultKeepAlive = defaultKeepAlive;
1683+
this.maxKeepAlive = maxKeepAlive;
1684+
}
1685+
1686+
long getKeepAlive() {
1687+
return SearchService.getKeepAlive(request, defaultKeepAlive, maxKeepAlive);
1688+
}
1689+
1690+
ReaderContext findReaderContext() {
1691+
return findReaderContext.apply(request.readerId(), request);
1692+
}
1693+
1694+
QueryRewriteContext getQueryRewriteContext(IndexService indexService) {
1695+
return indexService.newQueryRewriteContext(request::nowInMillis, request.getRuntimeMappings(), request.getClusterAlias());
1696+
}
1697+
1698+
SearchExecutionContext getSearchExecutionContext(Engine.Searcher searcher) {
1699+
return getIndexService().newSearchExecutionContext(
1700+
request.shardId().id(),
1701+
0,
1702+
searcher,
1703+
request::nowInMillis,
1704+
request.getClusterAlias(),
1705+
request.getRuntimeMappings()
1706+
);
1707+
}
1708+
1709+
IndexShard getShard() {
1710+
return getIndexService().getShard(request.shardId().getId());
1711+
}
1712+
1713+
IndexService getIndexService() {
1714+
if (this.indexService == null) {
1715+
this.indexService = indexServiceLookup.apply(request.shardId().getIndex());
1716+
}
1717+
return this.indexService;
1718+
}
1719+
}
1720+
1721+
static CanMatchShardResponse canMatch(CanMatchContext canMatchContext, boolean checkRefreshPending) {
1722+
assert canMatchContext.request.searchType() == SearchType.QUERY_THEN_FETCH
1723+
: "unexpected search type: " + canMatchContext.request.searchType();
16421724
Releasable releasable = null;
16431725
try {
16441726
IndexService indexService;
16451727
final boolean hasRefreshPending;
16461728
final Engine.Searcher canMatchSearcher;
1647-
if (request.readerId() != null) {
1729+
if (canMatchContext.request.readerId() != null) {
16481730
hasRefreshPending = false;
16491731
ReaderContext readerContext;
16501732
Engine.Searcher searcher;
16511733
try {
1652-
readerContext = findReaderContext(request.readerId(), request);
1653-
releasable = readerContext.markAsUsed(getKeepAlive(request));
1734+
readerContext = canMatchContext.findReaderContext();
1735+
releasable = readerContext.markAsUsed(canMatchContext.getKeepAlive());
16541736
indexService = readerContext.indexService();
1655-
if (canMatchAfterRewrite(request, indexService) == false) {
1737+
QueryRewriteContext queryRewriteContext = canMatchContext.getQueryRewriteContext(indexService);
1738+
if (queryStillMatchesAfterRewrite(canMatchContext.request, queryRewriteContext) == false) {
16561739
return new CanMatchShardResponse(false, null);
16571740
}
16581741
searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
16591742
} catch (SearchContextMissingException e) {
1660-
final String searcherId = request.readerId().getSearcherId();
1743+
final String searcherId = canMatchContext.request.readerId().getSearcherId();
16611744
if (searcherId == null) {
1662-
throw e;
1745+
return new CanMatchShardResponse(true, null);
16631746
}
1664-
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
1665-
if (canMatchAfterRewrite(request, indexService) == false) {
1747+
if (queryStillMatchesAfterRewrite(
1748+
canMatchContext.request,
1749+
canMatchContext.getQueryRewriteContext(canMatchContext.getIndexService())
1750+
) == false) {
16661751
return new CanMatchShardResponse(false, null);
16671752
}
1668-
IndexShard indexShard = indexService.getShard(request.shardId().getId());
1669-
final Engine.SearcherSupplier searcherSupplier = indexShard.acquireSearcherSupplier();
1753+
final Engine.SearcherSupplier searcherSupplier = canMatchContext.getShard().acquireSearcherSupplier();
16701754
if (searcherId.equals(searcherSupplier.getSearcherId()) == false) {
16711755
searcherSupplier.close();
1672-
throw e;
1756+
return new CanMatchShardResponse(true, null);
16731757
}
16741758
releasable = searcherSupplier;
16751759
searcher = searcherSupplier.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
16761760
}
16771761
canMatchSearcher = searcher;
16781762
} else {
1679-
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
1680-
if (canMatchAfterRewrite(request, indexService) == false) {
1763+
if (queryStillMatchesAfterRewrite(
1764+
canMatchContext.request,
1765+
canMatchContext.getQueryRewriteContext(canMatchContext.getIndexService())
1766+
) == false) {
16811767
return new CanMatchShardResponse(false, null);
16821768
}
1683-
IndexShard indexShard = indexService.getShard(request.shardId().getId());
1684-
boolean needsWaitForRefresh = request.waitForCheckpoint() != UNASSIGNED_SEQ_NO;
1769+
boolean needsWaitForRefresh = canMatchContext.request.waitForCheckpoint() != UNASSIGNED_SEQ_NO;
16851770
// If this request wait_for_refresh behavior, it is safest to assume a refresh is pending. Theoretically,
16861771
// this can be improved in the future by manually checking that the requested checkpoint has already been refresh.
16871772
// However, this will request modifying the engine to surface that information.
1773+
IndexShard indexShard = canMatchContext.getShard();
16881774
hasRefreshPending = needsWaitForRefresh || (indexShard.hasRefreshPending() && checkRefreshPending);
16891775
canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
16901776
}
16911777
try (canMatchSearcher) {
1692-
SearchExecutionContext context = indexService.newSearchExecutionContext(
1693-
request.shardId().id(),
1694-
0,
1695-
canMatchSearcher,
1696-
request::nowInMillis,
1697-
request.getClusterAlias(),
1698-
request.getRuntimeMappings()
1699-
);
1700-
final boolean canMatch = queryStillMatchesAfterRewrite(request, context);
1701-
final MinAndMax<?> minMax;
1778+
SearchExecutionContext context = canMatchContext.getSearchExecutionContext(canMatchSearcher);
1779+
final boolean canMatch = queryStillMatchesAfterRewrite(canMatchContext.request, context);
17021780
if (canMatch || hasRefreshPending) {
1703-
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
1704-
minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
1705-
} else {
1706-
minMax = null;
1781+
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(canMatchContext.request.source());
1782+
final MinAndMax<?> minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null;
1783+
return new CanMatchShardResponse(true, minMax);
17071784
}
1708-
return new CanMatchShardResponse(canMatch || hasRefreshPending, minMax);
1785+
return new CanMatchShardResponse(false, null);
17091786
}
1787+
} catch (Exception e) {
1788+
return new CanMatchShardResponse(true, null);
17101789
} finally {
17111790
Releasables.close(releasable);
17121791
}
@@ -1719,15 +1798,6 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check
17191798
* {@link MatchNoneQueryBuilder}. This allows us to avoid extra work for example making the shard search active and waiting for
17201799
* refreshes.
17211800
*/
1722-
private static boolean canMatchAfterRewrite(final ShardSearchRequest request, final IndexService indexService) throws IOException {
1723-
final QueryRewriteContext queryRewriteContext = indexService.newQueryRewriteContext(
1724-
request::nowInMillis,
1725-
request.getRuntimeMappings(),
1726-
request.getClusterAlias()
1727-
);
1728-
return queryStillMatchesAfterRewrite(request, queryRewriteContext);
1729-
}
1730-
17311801
@SuppressWarnings("unchecked")
17321802
public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException {
17331803
Rewriteable.rewrite(request.getRewriteable(), context, false);

0 commit comments

Comments
 (0)