Skip to content

Commit 632ceeb

Browse files
authored
Create QueryPhaseForReview
1 parent 91f5791 commit 632ceeb

File tree

1 file changed

+279
-0
lines changed

1 file changed

+279
-0
lines changed
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.query;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.apache.lucene.index.IndexReader;
15+
import org.apache.lucene.index.LeafReaderContext;
16+
import org.apache.lucene.search.BooleanClause;
17+
import org.apache.lucene.search.BooleanQuery;
18+
import org.apache.lucene.search.Collector;
19+
import org.apache.lucene.search.CollectorManager;
20+
import org.apache.lucene.search.FieldDoc;
21+
import org.apache.lucene.search.Query;
22+
import org.apache.lucene.search.ScoreDoc;
23+
import org.apache.lucene.search.ScoreMode;
24+
import org.apache.lucene.search.Sort;
25+
import org.apache.lucene.search.TopDocs;
26+
import org.apache.lucene.search.Weight;
27+
import org.elasticsearch.common.lucene.Lucene;
28+
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
29+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
30+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
31+
import org.elasticsearch.lucene.queries.SearchAfterSortedDocQuery;
32+
import org.elasticsearch.search.DocValueFormat;
33+
import org.elasticsearch.search.SearchContextSourcePrinter;
34+
import org.elasticsearch.search.SearchService;
35+
import org.elasticsearch.search.aggregations.AggregationPhase;
36+
import org.elasticsearch.search.internal.ContextIndexSearcher;
37+
import org.elasticsearch.search.internal.ScrollContext;
38+
import org.elasticsearch.search.internal.SearchContext;
39+
import org.elasticsearch.search.rank.RankSearchContext;
40+
import org.elasticsearch.search.rank.context.QueryPhaseRankShardContext;
41+
import org.elasticsearch.search.rescore.RescorePhase;
42+
import org.elasticsearch.search.sort.SortAndFormats;
43+
import org.elasticsearch.search.suggest.SuggestPhase;
44+
import org.elasticsearch.threadpool.ThreadPool;
45+
46+
import java.util.ArrayList;
47+
import java.util.List;
48+
import java.util.concurrent.ExecutorService;
49+
50+
import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED;
51+
52+
/**
53+
* Query phase of a search request, used to run the query and get back from each shard information about the matching documents
54+
* (document ids and score or sort criteria) so that matches can be reduced on the coordinating node
55+
*/
56+
public class QueryPhase {
57+
private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
58+
59+
private QueryPhase() {}
60+
61+
public static void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
62+
if (searchContext.queryPhaseRankShardContext() == null) {
63+
if (searchContext.request().source() != null && searchContext.request().source().rankBuilder() != null) {
64+
// if we have a RankBuilder provided, we want to fetch all rankWindowSize results
65+
// and rerank the documents as per the RankBuilder's instructions.
66+
// Pagination will take place later once they're all (re)ranked.
67+
searchContext.size(searchContext.request().source().rankBuilder().rankWindowSize());
68+
searchContext.from(0);
69+
}
70+
executeQuery(searchContext);
71+
} else {
72+
executeRank(searchContext);
73+
}
74+
}
75+
76+
static void executeRank(SearchContext searchContext) throws QueryPhaseExecutionException {
77+
QueryPhaseRankShardContext queryPhaseRankShardContext = searchContext.queryPhaseRankShardContext();
78+
QuerySearchResult querySearchResult = searchContext.queryResult();
79+
80+
// run the combined boolean query total hits or aggregations
81+
// otherwise mark top docs as empty
82+
if (searchContext.trackTotalHitsUpTo() != TRACK_TOTAL_HITS_DISABLED || searchContext.aggregations() != null) {
83+
searchContext.size(0);
84+
QueryPhase.executeQuery(searchContext);
85+
} else {
86+
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]);
87+
}
88+
89+
List<TopDocs> rrfRankResults = new ArrayList<>();
90+
boolean searchTimedOut = querySearchResult.searchTimedOut();
91+
long serviceTimeEWMA = querySearchResult.serviceTimeEWMA();
92+
int nodeQueueSize = querySearchResult.nodeQueueSize();
93+
try {
94+
// run each of the rank queries
95+
for (Query rankQuery : queryPhaseRankShardContext.queries()) {
96+
// if a search timeout occurs, exit with partial results
97+
if (searchTimedOut) {
98+
break;
99+
}
100+
try (
101+
RankSearchContext rankSearchContext = new RankSearchContext(
102+
searchContext,
103+
rankQuery,
104+
queryPhaseRankShardContext.rankWindowSize()
105+
)
106+
) {
107+
QueryPhase.addCollectorsAndSearch(rankSearchContext);
108+
QuerySearchResult rrfQuerySearchResult = rankSearchContext.queryResult();
109+
rrfRankResults.add(rrfQuerySearchResult.topDocs().topDocs);
110+
serviceTimeEWMA += rrfQuerySearchResult.serviceTimeEWMA();
111+
nodeQueueSize = Math.max(nodeQueueSize, rrfQuerySearchResult.nodeQueueSize());
112+
searchTimedOut = rrfQuerySearchResult.searchTimedOut();
113+
}
114+
}
115+
116+
querySearchResult.setRankShardResult(queryPhaseRankShardContext.combineQueryPhaseResults(rrfRankResults));
117+
118+
// record values relevant to all queries
119+
querySearchResult.searchTimedOut(searchTimedOut);
120+
querySearchResult.serviceTimeEWMA(serviceTimeEWMA);
121+
querySearchResult.nodeQueueSize(nodeQueueSize);
122+
} catch (Exception e) {
123+
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute rank query", e);
124+
}
125+
}
126+
127+
static void executeQuery(SearchContext searchContext) throws QueryPhaseExecutionException {
128+
if (searchContext.hasOnlySuggest()) {
129+
SuggestPhase.execute(searchContext);
130+
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]);
131+
return;
132+
}
133+
134+
if (LOGGER.isTraceEnabled()) {
135+
LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
136+
}
137+
138+
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
139+
// request, preProcess is called on the DFS phase, this is why we pre-process them
140+
// here to make sure it happens during the QUERY phase
141+
AggregationPhase.preProcess(searchContext);
142+
143+
addCollectorsAndSearch(searchContext);
144+
145+
RescorePhase.execute(searchContext);
146+
SuggestPhase.execute(searchContext);
147+
148+
if (searchContext.getProfilers() != null) {
149+
searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults());
150+
}
151+
}
152+
153+
/**
154+
* In a package-private method so that it can be tested without having to
155+
* wire everything (mapperService, etc.)
156+
*/
157+
static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhaseExecutionException {
158+
final ContextIndexSearcher searcher = searchContext.searcher();
159+
final IndexReader reader = searcher.getIndexReader();
160+
QuerySearchResult queryResult = searchContext.queryResult();
161+
queryResult.searchTimedOut(false);
162+
try {
163+
queryResult.from(searchContext.from());
164+
queryResult.size(searchContext.size());
165+
Query query = searchContext.rewrittenQuery();
166+
assert query == searcher.rewrite(query); // already rewritten
167+
168+
final ScrollContext scrollContext = searchContext.scrollContext();
169+
if (scrollContext != null) {
170+
if (scrollContext.totalHits == null) {
171+
// first round
172+
assert scrollContext.lastEmittedDoc == null;
173+
// there is not much that we can optimize here since we want to collect all
174+
// documents in order to get the total number of hits
175+
176+
} else {
177+
final ScoreDoc after = scrollContext.lastEmittedDoc;
178+
if (canEarlyTerminate(reader, searchContext.sort())) {
179+
// now this gets interesting: since the search sort is a prefix of the index sort, we can directly
180+
// skip to the desired doc
181+
if (after != null) {
182+
query = new BooleanQuery.Builder().add(query, BooleanClause.Occur.MUST)
183+
.add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER)
184+
.build();
185+
}
186+
}
187+
}
188+
}
189+
190+
final boolean hasFilterCollector = searchContext.parsedPostFilter() != null || searchContext.minimumScore() != null;
191+
192+
Weight postFilterWeight = null;
193+
if (searchContext.parsedPostFilter() != null) {
194+
postFilterWeight = searcher.createWeight(
195+
searcher.rewrite(searchContext.parsedPostFilter().query()),
196+
ScoreMode.COMPLETE_NO_SCORES,
197+
1f
198+
);
199+
}
200+
201+
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager(
202+
postFilterWeight,
203+
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
204+
searchContext,
205+
hasFilterCollector
206+
);
207+
208+
final Runnable timeoutRunnable = getTimeoutCheck(searchContext);
209+
if (timeoutRunnable != null) {
210+
searcher.addQueryCancellation(timeoutRunnable);
211+
}
212+
213+
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
214+
if (searchContext.getProfilers() != null) {
215+
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
216+
}
217+
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
218+
if (searcher.timeExceeded()) {
219+
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
220+
SearchTimeoutException.handleTimeout(
221+
searchContext.request().allowPartialSearchResults(),
222+
searchContext.shardTarget(),
223+
searchContext.queryResult()
224+
);
225+
}
226+
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
227+
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
228+
}
229+
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
230+
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
231+
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
232+
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
233+
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
234+
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
235+
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
236+
}
237+
} catch (Exception e) {
238+
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
239+
}
240+
}
241+
242+
/**
243+
* Returns whether collection within the provided <code>reader</code> can be early-terminated if it sorts
244+
* with <code>sortAndFormats</code>.
245+
**/
246+
private static boolean canEarlyTerminate(IndexReader reader, SortAndFormats sortAndFormats) {
247+
if (sortAndFormats == null || sortAndFormats.sort == null) {
248+
return false;
249+
}
250+
final Sort sort = sortAndFormats.sort;
251+
for (LeafReaderContext ctx : reader.leaves()) {
252+
Sort indexSort = ctx.reader().getMetaData().sort();
253+
if (indexSort == null || Lucene.canEarlyTerminate(sort, indexSort) == false) {
254+
return false;
255+
}
256+
}
257+
return true;
258+
}
259+
260+
public static Runnable getTimeoutCheck(SearchContext searchContext) {
261+
boolean timeoutSet = searchContext.scrollContext() == null
262+
&& searchContext.timeout() != null
263+
&& searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;
264+
265+
if (timeoutSet) {
266+
final long startTime = searchContext.getRelativeTimeInMillis();
267+
final long timeout = searchContext.timeout().millis();
268+
final long maxTime = startTime + timeout;
269+
return () -> {
270+
final long time = searchContext.getRelativeTimeInMillis();
271+
if (time > maxTime) {
272+
searchContext.searcher().throwTimeExceededException();
273+
}
274+
};
275+
} else {
276+
return null;
277+
}
278+
}
279+
}

0 commit comments

Comments
 (0)