Skip to content

Commit 267abe7

Browse files
authored
Fix handling of time exceeded exception in fetch phase (#116676)
The fetch phase is subject to timeouts like any other search phase. Timeouts may happen when low level cancellation is enabled (true by default), hence the directory reader is wrapped into ExitableDirectoryReader and a timeout is provided to the search request. The exception that is used is TimeExceededException, but it is an internal exception that should never be returned to the user. When that is thrown, we need to catch it and throw error or mark the response as timed out depending on whether partial results are allowed or not.
1 parent 2ea8e12 commit 267abe7

File tree

6 files changed

+268
-20
lines changed

6 files changed

+268
-20
lines changed

docs/changelog/116676.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116676
2+
summary: Fix handling of time exceeded exception in fetch phase
3+
area: Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,16 @@ protected SearchHit nextDoc(int doc) throws IOException {
191191
}
192192
};
193193

194-
SearchHit[] hits = docsIterator.iterate(context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad);
194+
SearchHit[] hits = docsIterator.iterate(
195+
context.shardTarget(),
196+
context.searcher().getIndexReader(),
197+
docIdsToLoad,
198+
context.request().allowPartialSearchResults()
199+
);
200+
201+
if (docsIterator.isTimedOut()) {
202+
context.queryResult().searchTimedOut(true);
203+
}
195204

196205
if (context.isCancelled()) {
197206
for (SearchHit hit : hits) {

server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.apache.lucene.index.LeafReaderContext;
1414
import org.apache.lucene.index.ReaderUtil;
1515
import org.elasticsearch.search.SearchHit;
16+
import org.elasticsearch.search.SearchHits;
1617
import org.elasticsearch.search.SearchShardTarget;
18+
import org.elasticsearch.search.internal.ContextIndexSearcher;
19+
import org.elasticsearch.search.query.SearchTimeoutException;
1720

1821
import java.io.IOException;
1922
import java.util.Arrays;
@@ -27,6 +30,12 @@
2730
*/
2831
abstract class FetchPhaseDocsIterator {
2932

33+
private boolean timedOut = false;
34+
35+
public boolean isTimedOut() {
36+
return timedOut;
37+
}
38+
3039
/**
3140
* Called when a new leaf reader is reached
3241
* @param ctx the leaf reader for this set of doc ids
@@ -44,7 +53,7 @@ abstract class FetchPhaseDocsIterator {
4453
/**
4554
* Iterate over a set of docsIds within a particular shard and index reader
4655
*/
47-
public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader indexReader, int[] docIds) {
56+
public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader indexReader, int[] docIds, boolean allowPartialResults) {
4857
SearchHit[] searchHits = new SearchHit[docIds.length];
4958
DocIdToIndex[] docs = new DocIdToIndex[docIds.length];
5059
for (int index = 0; index < docIds.length; index++) {
@@ -58,30 +67,55 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde
5867
LeafReaderContext ctx = indexReader.leaves().get(leafOrd);
5968
int endReaderIdx = endReaderIdx(ctx, 0, docs);
6069
int[] docsInLeaf = docIdsInLeaf(0, endReaderIdx, docs, ctx.docBase);
61-
setNextReader(ctx, docsInLeaf);
62-
for (int i = 0; i < docs.length; i++) {
63-
if (i >= endReaderIdx) {
64-
leafOrd = ReaderUtil.subIndex(docs[i].docId, indexReader.leaves());
65-
ctx = indexReader.leaves().get(leafOrd);
66-
endReaderIdx = endReaderIdx(ctx, i, docs);
67-
docsInLeaf = docIdsInLeaf(i, endReaderIdx, docs, ctx.docBase);
68-
setNextReader(ctx, docsInLeaf);
70+
try {
71+
setNextReader(ctx, docsInLeaf);
72+
} catch (ContextIndexSearcher.TimeExceededException timeExceededException) {
73+
if (allowPartialResults) {
74+
timedOut = true;
75+
return SearchHits.EMPTY;
6976
}
70-
currentDoc = docs[i].docId;
71-
assert searchHits[docs[i].index] == null;
72-
searchHits[docs[i].index] = nextDoc(docs[i].docId);
77+
throw new SearchTimeoutException(shardTarget, "Time exceeded");
7378
}
74-
} catch (Exception e) {
75-
for (SearchHit searchHit : searchHits) {
76-
if (searchHit != null) {
77-
searchHit.decRef();
79+
for (int i = 0; i < docs.length; i++) {
80+
try {
81+
if (i >= endReaderIdx) {
82+
leafOrd = ReaderUtil.subIndex(docs[i].docId, indexReader.leaves());
83+
ctx = indexReader.leaves().get(leafOrd);
84+
endReaderIdx = endReaderIdx(ctx, i, docs);
85+
docsInLeaf = docIdsInLeaf(i, endReaderIdx, docs, ctx.docBase);
86+
setNextReader(ctx, docsInLeaf);
87+
}
88+
currentDoc = docs[i].docId;
89+
assert searchHits[docs[i].index] == null;
90+
searchHits[docs[i].index] = nextDoc(docs[i].docId);
91+
} catch (ContextIndexSearcher.TimeExceededException timeExceededException) {
92+
if (allowPartialResults) {
93+
timedOut = true;
94+
SearchHit[] partialSearchHits = new SearchHit[i];
95+
System.arraycopy(searchHits, 0, partialSearchHits, 0, i);
96+
return partialSearchHits;
97+
}
98+
purgeSearchHits(searchHits);
99+
throw new SearchTimeoutException(shardTarget, "Time exceeded");
78100
}
79101
}
102+
} catch (SearchTimeoutException e) {
103+
throw e;
104+
} catch (Exception e) {
105+
purgeSearchHits(searchHits);
80106
throw new FetchPhaseExecutionException(shardTarget, "Error running fetch phase for doc [" + currentDoc + "]", e);
81107
}
82108
return searchHits;
83109
}
84110

111+
private static void purgeSearchHits(SearchHit[] searchHits) {
112+
for (SearchHit searchHit : searchHits) {
113+
if (searchHit != null) {
114+
searchHit.decRef();
115+
}
116+
}
117+
}
118+
85119
private static int endReaderIdx(LeafReaderContext currentReaderContext, int index, DocIdToIndex[] docs) {
86120
int firstInNextReader = currentReaderContext.docBase + currentReaderContext.reader().maxDoc();
87121
int i = index + 1;

server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,65 @@
88
*/
99
package org.elasticsearch.action.search;
1010

11+
import org.apache.lucene.document.Document;
12+
import org.apache.lucene.index.IndexReader;
13+
import org.apache.lucene.index.LeafReaderContext;
14+
import org.apache.lucene.search.Query;
15+
import org.apache.lucene.search.QueryCachingPolicy;
1116
import org.apache.lucene.search.ScoreDoc;
1217
import org.apache.lucene.search.TopDocs;
1318
import org.apache.lucene.search.TotalHits;
19+
import org.apache.lucene.store.Directory;
20+
import org.apache.lucene.tests.index.RandomIndexWriter;
1421
import org.apache.lucene.tests.store.MockDirectoryWrapper;
22+
import org.apache.lucene.util.Accountable;
1523
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.OriginalIndices;
25+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1626
import org.elasticsearch.common.UUIDs;
1727
import org.elasticsearch.common.breaker.CircuitBreaker;
1828
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1929
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
30+
import org.elasticsearch.common.settings.Settings;
2031
import org.elasticsearch.common.util.concurrent.AtomicArray;
2132
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.index.IndexSettings;
34+
import org.elasticsearch.index.IndexVersion;
35+
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
36+
import org.elasticsearch.index.mapper.IdLoader;
37+
import org.elasticsearch.index.mapper.MapperMetrics;
38+
import org.elasticsearch.index.mapper.MappingLookup;
39+
import org.elasticsearch.index.query.SearchExecutionContext;
2240
import org.elasticsearch.index.shard.ShardId;
2341
import org.elasticsearch.search.DocValueFormat;
2442
import org.elasticsearch.search.SearchHit;
2543
import org.elasticsearch.search.SearchHits;
2644
import org.elasticsearch.search.SearchPhaseResult;
2745
import org.elasticsearch.search.SearchShardTarget;
46+
import org.elasticsearch.search.fetch.FetchPhase;
2847
import org.elasticsearch.search.fetch.FetchSearchResult;
48+
import org.elasticsearch.search.fetch.FetchSubPhase;
49+
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
2950
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
3051
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
52+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
53+
import org.elasticsearch.search.internal.AliasFilter;
54+
import org.elasticsearch.search.internal.ContextIndexSearcher;
55+
import org.elasticsearch.search.internal.SearchContext;
3156
import org.elasticsearch.search.internal.ShardSearchContextId;
57+
import org.elasticsearch.search.internal.ShardSearchRequest;
3258
import org.elasticsearch.search.profile.ProfileResult;
3359
import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
3460
import org.elasticsearch.search.profile.SearchProfileShardResult;
3561
import org.elasticsearch.search.query.QuerySearchResult;
62+
import org.elasticsearch.search.query.SearchTimeoutException;
3663
import org.elasticsearch.test.ESTestCase;
3764
import org.elasticsearch.test.InternalAggregationTestCase;
65+
import org.elasticsearch.test.TestSearchContext;
3866
import org.elasticsearch.transport.Transport;
3967

68+
import java.io.IOException;
69+
import java.util.Collections;
4070
import java.util.List;
4171
import java.util.Map;
4272
import java.util.concurrent.CountDownLatch;
@@ -747,4 +777,159 @@ private static void addProfiling(boolean profiled, QuerySearchResult queryResult
747777
private static ProfileResult fetchProfile(boolean profiled) {
748778
return profiled ? new ProfileResult("fetch", "fetch", Map.of(), Map.of(), FETCH_PROFILE_TIME, List.of()) : null;
749779
}
780+
781+
public void testFetchTimeoutWithPartialResults() throws IOException {
782+
Directory dir = newDirectory();
783+
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
784+
w.addDocument(new Document());
785+
w.addDocument(new Document());
786+
w.addDocument(new Document());
787+
IndexReader r = w.getReader();
788+
w.close();
789+
ContextIndexSearcher contextIndexSearcher = createSearcher(r);
790+
try (SearchContext searchContext = createSearchContext(contextIndexSearcher, true)) {
791+
FetchPhase fetchPhase = createFetchPhase(contextIndexSearcher);
792+
fetchPhase.execute(searchContext, new int[] { 0, 1, 2 }, null);
793+
assertTrue(searchContext.queryResult().searchTimedOut());
794+
assertEquals(1, searchContext.fetchResult().hits().getHits().length);
795+
} finally {
796+
r.close();
797+
dir.close();
798+
}
799+
}
800+
801+
public void testFetchTimeoutNoPartialResults() throws IOException {
802+
Directory dir = newDirectory();
803+
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
804+
w.addDocument(new Document());
805+
w.addDocument(new Document());
806+
w.addDocument(new Document());
807+
IndexReader r = w.getReader();
808+
w.close();
809+
ContextIndexSearcher contextIndexSearcher = createSearcher(r);
810+
811+
try (SearchContext searchContext = createSearchContext(contextIndexSearcher, false)) {
812+
FetchPhase fetchPhase = createFetchPhase(contextIndexSearcher);
813+
expectThrows(SearchTimeoutException.class, () -> fetchPhase.execute(searchContext, new int[] { 0, 1, 2 }, null));
814+
assertNull(searchContext.fetchResult().hits());
815+
} finally {
816+
r.close();
817+
dir.close();
818+
}
819+
}
820+
821+
private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException {
822+
return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() {
823+
@Override
824+
public void onUse(Query query) {}
825+
826+
@Override
827+
public boolean shouldCache(Query query) {
828+
return false;
829+
}
830+
}, randomBoolean());
831+
}
832+
833+
private static FetchPhase createFetchPhase(ContextIndexSearcher contextIndexSearcher) {
834+
return new FetchPhase(Collections.singletonList(fetchContext -> new FetchSubPhaseProcessor() {
835+
boolean processCalledOnce = false;
836+
837+
@Override
838+
public void setNextReader(LeafReaderContext readerContext) {}
839+
840+
@Override
841+
public void process(FetchSubPhase.HitContext hitContext) {
842+
// we throw only once one doc has been fetched, so we can test partial results are returned
843+
if (processCalledOnce) {
844+
contextIndexSearcher.throwTimeExceededException();
845+
} else {
846+
processCalledOnce = true;
847+
}
848+
}
849+
850+
@Override
851+
public StoredFieldsSpec storedFieldsSpec() {
852+
return StoredFieldsSpec.NO_REQUIREMENTS;
853+
}
854+
}));
855+
}
856+
857+
private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
858+
IndexSettings indexSettings = new IndexSettings(
859+
IndexMetadata.builder("index")
860+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
861+
.numberOfShards(1)
862+
.numberOfReplicas(0)
863+
.creationDate(System.currentTimeMillis())
864+
.build(),
865+
Settings.EMPTY
866+
);
867+
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetFilterCache.Listener() {
868+
@Override
869+
public void onCache(ShardId shardId, Accountable accountable) {
870+
871+
}
872+
873+
@Override
874+
public void onRemoval(ShardId shardId, Accountable accountable) {
875+
876+
}
877+
});
878+
879+
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(
880+
0,
881+
0,
882+
indexSettings,
883+
bitsetFilterCache,
884+
null,
885+
null,
886+
MappingLookup.EMPTY,
887+
null,
888+
null,
889+
null,
890+
null,
891+
null,
892+
null,
893+
null,
894+
null,
895+
null,
896+
null,
897+
null,
898+
Collections.emptyMap(),
899+
null,
900+
MapperMetrics.NOOP
901+
);
902+
TestSearchContext searchContext = new TestSearchContext(searchExecutionContext, null, contextIndexSearcher) {
903+
private final FetchSearchResult fetchSearchResult = new FetchSearchResult();
904+
private final ShardSearchRequest request = new ShardSearchRequest(
905+
OriginalIndices.NONE,
906+
new SearchRequest().allowPartialSearchResults(allowPartialResults),
907+
new ShardId("index", "indexUUID", 0),
908+
0,
909+
1,
910+
AliasFilter.EMPTY,
911+
1f,
912+
0L,
913+
null
914+
);
915+
916+
@Override
917+
public IdLoader newIdLoader() {
918+
return new IdLoader.StoredIdLoader();
919+
}
920+
921+
@Override
922+
public FetchSearchResult fetchResult() {
923+
return fetchSearchResult;
924+
}
925+
926+
@Override
927+
public ShardSearchRequest request() {
928+
return request;
929+
}
930+
};
931+
searchContext.addReleasable(searchContext.fetchResult()::decRef);
932+
searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));
933+
return searchContext;
934+
}
750935
}

server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected SearchHit nextDoc(int doc) {
7777
}
7878
};
7979

80-
SearchHit[] hits = it.iterate(null, reader, docs);
80+
SearchHit[] hits = it.iterate(null, reader, docs, randomBoolean());
8181

8282
assertThat(hits.length, equalTo(docs.length));
8383
for (int i = 0; i < hits.length; i++) {
@@ -125,7 +125,7 @@ protected SearchHit nextDoc(int doc) {
125125
}
126126
};
127127

128-
Exception e = expectThrows(FetchPhaseExecutionException.class, () -> it.iterate(null, reader, docs));
128+
Exception e = expectThrows(FetchPhaseExecutionException.class, () -> it.iterate(null, reader, docs, randomBoolean()));
129129
assertThat(e.getMessage(), containsString("Error running fetch phase for doc [" + badDoc + "]"));
130130
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
131131

0 commit comments

Comments
 (0)