Skip to content

Commit 9e33b57

Browse files
committed
Fix handling of time exceeded exception in fetch phase
The fetch phase is subject to timeouts like any other search phase. Timeouts may happen when low level cancellation is enabled (true by default), hence the directory reader is wrapped into ExitableDirectoryReader and a timeout is provided to the search request. The exception that is used is TimeExceededException, but it is an internal exception that should never be returned to the user. When that is thrown, we need to catch it and throw error or mark the response as timed out depending on whether partial results are allowed or not.
1 parent 6c85934 commit 9e33b57

File tree

5 files changed

+214
-10
lines changed

5 files changed

+214
-10
lines changed

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,16 @@ protected SearchHit nextDoc(int doc) throws IOException {
191191
}
192192
};
193193

194-
SearchHit[] hits = docsIterator.iterate(context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad);
194+
SearchHit[] hits = docsIterator.iterate(
195+
context.shardTarget(),
196+
context.searcher().getIndexReader(),
197+
docIdsToLoad,
198+
context.request().allowPartialSearchResults()
199+
);
200+
201+
if (docsIterator.isTimedOut()) {
202+
context.queryResult().searchTimedOut(true);
203+
}
195204

196205
if (context.isCancelled()) {
197206
for (SearchHit hit : hits) {

server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.apache.lucene.index.ReaderUtil;
1515
import org.elasticsearch.search.SearchHit;
1616
import org.elasticsearch.search.SearchShardTarget;
17+
import org.elasticsearch.search.internal.ContextIndexSearcher;
18+
import org.elasticsearch.search.query.SearchTimeoutException;
1719

1820
import java.io.IOException;
1921
import java.util.Arrays;
@@ -27,6 +29,12 @@
2729
*/
2830
abstract class FetchPhaseDocsIterator {
2931

32+
private boolean timedOut = false;
33+
34+
public boolean isTimedOut() {
35+
return timedOut;
36+
}
37+
3038
/**
3139
* Called when a new leaf reader is reached
3240
* @param ctx the leaf reader for this set of doc ids
@@ -44,7 +52,7 @@ abstract class FetchPhaseDocsIterator {
4452
/**
4553
* Iterate over a set of docsIds within a particular shard and index reader
4654
*/
47-
public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader indexReader, int[] docIds) {
55+
public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader indexReader, int[] docIds, boolean allowPartialResults) {
4856
SearchHit[] searchHits = new SearchHit[docIds.length];
4957
DocIdToIndex[] docs = new DocIdToIndex[docIds.length];
5058
for (int index = 0; index < docIds.length; index++) {
@@ -71,17 +79,28 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde
7179
assert searchHits[docs[i].index] == null;
7280
searchHits[docs[i].index] = nextDoc(docs[i].docId);
7381
}
74-
} catch (Exception e) {
75-
for (SearchHit searchHit : searchHits) {
76-
if (searchHit != null) {
77-
searchHit.decRef();
78-
}
82+
} catch (ContextIndexSearcher.TimeExceededException timeExceededException) {
83+
if (allowPartialResults) {
84+
timedOut = true;
85+
} else {
86+
purgeSearcHits(searchHits);
87+
throw new SearchTimeoutException(shardTarget, "Time exceeded");
7988
}
89+
} catch (Exception e) {
90+
purgeSearcHits(searchHits);
8091
throw new FetchPhaseExecutionException(shardTarget, "Error running fetch phase for doc [" + currentDoc + "]", e);
8192
}
8293
return searchHits;
8394
}
8495

96+
private static void purgeSearcHits(SearchHit[] searchHits) {
97+
for (SearchHit searchHit : searchHits) {
98+
if (searchHit != null) {
99+
searchHit.decRef();
100+
}
101+
}
102+
}
103+
85104
private static int endReaderIdx(LeafReaderContext currentReaderContext, int index, DocIdToIndex[] docs) {
86105
int firstInNextReader = currentReaderContext.docBase + currentReaderContext.reader().maxDoc();
87106
int i = index + 1;

server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void shardResult(SearchHits hits, ProfileResult profileResult) {
7575

7676
private static boolean assertNoSearchTarget(SearchHits hits) {
7777
for (SearchHit hit : hits.getHits()) {
78-
assert hit.getShard() == null : "expected null but got: " + hit.getShard();
78+
assert hit == null || hit.getShard() == null : "expected null but got: " + hit.getShard();
7979
}
8080
return true;
8181
}

server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,65 @@
88
*/
99
package org.elasticsearch.action.search;
1010

11+
import org.apache.lucene.document.Document;
12+
import org.apache.lucene.index.IndexReader;
13+
import org.apache.lucene.index.LeafReaderContext;
14+
import org.apache.lucene.search.Query;
15+
import org.apache.lucene.search.QueryCachingPolicy;
1116
import org.apache.lucene.search.ScoreDoc;
1217
import org.apache.lucene.search.TopDocs;
1318
import org.apache.lucene.search.TotalHits;
19+
import org.apache.lucene.store.Directory;
20+
import org.apache.lucene.tests.index.RandomIndexWriter;
1421
import org.apache.lucene.tests.store.MockDirectoryWrapper;
22+
import org.apache.lucene.util.Accountable;
1523
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.OriginalIndices;
25+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1626
import org.elasticsearch.common.UUIDs;
1727
import org.elasticsearch.common.breaker.CircuitBreaker;
1828
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1929
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
30+
import org.elasticsearch.common.settings.Settings;
2031
import org.elasticsearch.common.util.concurrent.AtomicArray;
2132
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.index.IndexSettings;
34+
import org.elasticsearch.index.IndexVersion;
35+
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
36+
import org.elasticsearch.index.mapper.IdLoader;
37+
import org.elasticsearch.index.mapper.MapperMetrics;
38+
import org.elasticsearch.index.mapper.MappingLookup;
39+
import org.elasticsearch.index.query.SearchExecutionContext;
2240
import org.elasticsearch.index.shard.ShardId;
2341
import org.elasticsearch.search.DocValueFormat;
2442
import org.elasticsearch.search.SearchHit;
2543
import org.elasticsearch.search.SearchHits;
2644
import org.elasticsearch.search.SearchPhaseResult;
2745
import org.elasticsearch.search.SearchShardTarget;
46+
import org.elasticsearch.search.fetch.FetchPhase;
2847
import org.elasticsearch.search.fetch.FetchSearchResult;
48+
import org.elasticsearch.search.fetch.FetchSubPhase;
49+
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
2950
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
3051
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
52+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
53+
import org.elasticsearch.search.internal.AliasFilter;
54+
import org.elasticsearch.search.internal.ContextIndexSearcher;
55+
import org.elasticsearch.search.internal.SearchContext;
3156
import org.elasticsearch.search.internal.ShardSearchContextId;
57+
import org.elasticsearch.search.internal.ShardSearchRequest;
3258
import org.elasticsearch.search.profile.ProfileResult;
3359
import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
3460
import org.elasticsearch.search.profile.SearchProfileShardResult;
3561
import org.elasticsearch.search.query.QuerySearchResult;
62+
import org.elasticsearch.search.query.SearchTimeoutException;
3663
import org.elasticsearch.test.ESTestCase;
3764
import org.elasticsearch.test.InternalAggregationTestCase;
65+
import org.elasticsearch.test.TestSearchContext;
3866
import org.elasticsearch.transport.Transport;
3967

68+
import java.io.IOException;
69+
import java.util.Collections;
4070
import java.util.List;
4171
import java.util.Map;
4272
import java.util.concurrent.CountDownLatch;
@@ -749,4 +779,150 @@ private static void addProfiling(boolean profiled, QuerySearchResult queryResult
749779
private static ProfileResult fetchProfile(boolean profiled) {
750780
return profiled ? new ProfileResult("fetch", "fetch", Map.of(), Map.of(), FETCH_PROFILE_TIME, List.of()) : null;
751781
}
782+
783+
public void testFetchTimeoutWithPartialResults() throws IOException {
784+
Directory dir = newDirectory();
785+
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
786+
w.addDocument(new Document());
787+
IndexReader r = w.getReader();
788+
w.close();
789+
try {
790+
ContextIndexSearcher contextIndexSearcher = new ContextIndexSearcher(r, null, null, new QueryCachingPolicy() {
791+
@Override
792+
public void onUse(Query query) {}
793+
794+
@Override
795+
public boolean shouldCache(Query query) {
796+
return false;
797+
}
798+
}, randomBoolean());
799+
SearchContext searchContext = createSearchContext(contextIndexSearcher, true);
800+
FetchPhase fetchPhase = createFetchPhase(contextIndexSearcher);
801+
fetchPhase.execute(searchContext, new int[] { 0 }, null);
802+
assertTrue(searchContext.queryResult().searchTimedOut());
803+
} finally {
804+
r.close();
805+
dir.close();
806+
}
807+
}
808+
809+
public void testFetchTimeoutNoPartialResults() throws IOException {
810+
Directory dir = newDirectory();
811+
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
812+
w.addDocument(new Document());
813+
IndexReader r = w.getReader();
814+
w.close();
815+
try {
816+
ContextIndexSearcher contextIndexSearcher = new ContextIndexSearcher(r, null, null, new QueryCachingPolicy() {
817+
@Override
818+
public void onUse(Query query) {}
819+
820+
@Override
821+
public boolean shouldCache(Query query) {
822+
return false;
823+
}
824+
}, randomBoolean());
825+
SearchContext searchContext = createSearchContext(contextIndexSearcher, false);
826+
FetchPhase fetchPhase = createFetchPhase(contextIndexSearcher);
827+
expectThrows(SearchTimeoutException.class, () -> fetchPhase.execute(searchContext, new int[] { 0 }, null));
828+
} finally {
829+
r.close();
830+
dir.close();
831+
}
832+
}
833+
834+
private static FetchPhase createFetchPhase(ContextIndexSearcher contextIndexSearcher) {
835+
return new FetchPhase(Collections.singletonList(fetchContext -> new FetchSubPhaseProcessor() {
836+
@Override
837+
public void setNextReader(LeafReaderContext readerContext) {}
838+
839+
@Override
840+
public void process(FetchSubPhase.HitContext hitContext) {
841+
contextIndexSearcher.throwTimeExceededException();
842+
}
843+
844+
@Override
845+
public StoredFieldsSpec storedFieldsSpec() {
846+
return StoredFieldsSpec.NO_REQUIREMENTS;
847+
}
848+
}));
849+
}
850+
851+
private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
852+
IndexSettings indexSettings = new IndexSettings(
853+
IndexMetadata.builder("index")
854+
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
855+
.numberOfShards(1)
856+
.numberOfReplicas(0)
857+
.creationDate(System.currentTimeMillis())
858+
.build(),
859+
Settings.EMPTY
860+
);
861+
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetFilterCache.Listener() {
862+
@Override
863+
public void onCache(ShardId shardId, Accountable accountable) {
864+
865+
}
866+
867+
@Override
868+
public void onRemoval(ShardId shardId, Accountable accountable) {
869+
870+
}
871+
});
872+
873+
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(
874+
0,
875+
0,
876+
indexSettings,
877+
bitsetFilterCache,
878+
null,
879+
null,
880+
MappingLookup.EMPTY,
881+
null,
882+
null,
883+
null,
884+
null,
885+
null,
886+
null,
887+
null,
888+
null,
889+
null,
890+
null,
891+
null,
892+
Collections.emptyMap(),
893+
null,
894+
MapperMetrics.NOOP
895+
);
896+
TestSearchContext searchContext = new TestSearchContext(searchExecutionContext, null, contextIndexSearcher) {
897+
private final FetchSearchResult fetchSearchResult = new FetchSearchResult();
898+
private final ShardSearchRequest request = new ShardSearchRequest(
899+
OriginalIndices.NONE,
900+
new SearchRequest().allowPartialSearchResults(allowPartialResults),
901+
new ShardId("index", "indexUUID", 0),
902+
0,
903+
1,
904+
AliasFilter.EMPTY,
905+
1f,
906+
0L,
907+
null
908+
);
909+
910+
@Override
911+
public IdLoader newIdLoader() {
912+
return new IdLoader.StoredIdLoader();
913+
}
914+
915+
@Override
916+
public FetchSearchResult fetchResult() {
917+
return fetchSearchResult;
918+
}
919+
920+
@Override
921+
public ShardSearchRequest request() {
922+
return request;
923+
}
924+
};
925+
searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));
926+
return searchContext;
927+
}
752928
}

server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected SearchHit nextDoc(int doc) {
7777
}
7878
};
7979

80-
SearchHit[] hits = it.iterate(null, reader, docs);
80+
SearchHit[] hits = it.iterate(null, reader, docs, randomBoolean());
8181

8282
assertThat(hits.length, equalTo(docs.length));
8383
for (int i = 0; i < hits.length; i++) {
@@ -125,7 +125,7 @@ protected SearchHit nextDoc(int doc) {
125125
}
126126
};
127127

128-
Exception e = expectThrows(FetchPhaseExecutionException.class, () -> it.iterate(null, reader, docs));
128+
Exception e = expectThrows(FetchPhaseExecutionException.class, () -> it.iterate(null, reader, docs, randomBoolean()));
129129
assertThat(e.getMessage(), containsString("Error running fetch phase for doc [" + badDoc + "]"));
130130
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
131131

0 commit comments

Comments
 (0)