Skip to content

Commit b5d6880

Browse files
committed
iter
1 parent 810fd02 commit b5d6880

File tree

5 files changed

+35
-37
lines changed

5 files changed

+35
-37
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,6 @@ tests:
330330
- class: org.elasticsearch.search.CCSDuelIT
331331
method: testTerminateAfter
332332
issue: https://github.com/elastic/elasticsearch/issues/126085
333-
- class: org.elasticsearch.search.sort.GeoDistanceIT
334-
method: testDistanceSortingWithUnmappedField
335-
issue: https://github.com/elastic/elasticsearch/issues/126118
336333
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
337334
method: testSearchWithRandomDisconnects
338335
issue: https://github.com/elastic/elasticsearch/issues/122707

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,19 @@ private static void consumePartialMergeResult(
303303
Collection<InternalAggregations> aggsList
304304
) {
305305
if (topDocsList != null) {
306-
topDocsList.add(partialResult.reducedTopDocs);
306+
addTopDocsToList(partialResult, topDocsList);
307307
}
308308
if (aggsList != null) {
309309
addAggsToList(partialResult, aggsList);
310310
}
311311
}
312312

313+
private static void addTopDocsToList(MergeResult partialResult, List<TopDocs> topDocsList) {
314+
if (partialResult.reducedTopDocs != null) {
315+
topDocsList.add(partialResult.reducedTopDocs);
316+
}
317+
}
318+
313319
private static void addAggsToList(MergeResult partialResult, Collection<InternalAggregations> aggsList) {
314320
var aggs = partialResult.reducedAggs;
315321
if (aggs != null) {
@@ -340,7 +346,7 @@ private MergeResult partialReduce(
340346
if (hasTopDocs) {
341347
topDocsList = new ArrayList<>(resultSetSize);
342348
if (lastMerge != null) {
343-
topDocsList.add(lastMerge.reducedTopDocs);
349+
addTopDocsToList(lastMerge, topDocsList);
344350
}
345351
} else {
346352
topDocsList = null;
@@ -358,7 +364,7 @@ private MergeResult partialReduce(
358364
}
359365
}
360366
// we have to merge here in the same way we collect on a shard
361-
newTopDocs = topDocsList == null ? Lucene.EMPTY_TOP_DOCS : mergeTopDocs(topDocsList, topNSize, 0);
367+
newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0);
362368
newAggs = hasAggs
363369
? aggregate(
364370
toConsume.iterator(),
@@ -647,7 +653,7 @@ private static void releaseAggs(List<QuerySearchResult> toConsume) {
647653

648654
record MergeResult(
649655
List<SearchShard> processedShards,
650-
TopDocs reducedTopDocs,
656+
@Nullable TopDocs reducedTopDocs,
651657
@Nullable InternalAggregations reducedAggs,
652658
long estimatedSize
653659
) implements Writeable {

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.HashMap;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.Objects;
6364
import java.util.concurrent.Executor;
6465
import java.util.function.BiFunction;
6566
import java.util.function.Consumer;
@@ -140,42 +141,31 @@ static SortedTopDocs sortDocs(
140141
}
141142

142143
static TopDocs mergeTopDocs(List<TopDocs> results, int topN, int from) {
143-
if (results.isEmpty()) {
144+
List<TopDocs> topDocsList = results.stream().filter(Objects::nonNull).toList();
145+
if (topDocsList.isEmpty()) {
144146
return null;
145147
}
146-
final TopDocs topDocs = results.getFirst();
147-
final TopDocs mergedTopDocs;
148-
final int numShards = results.size();
148+
final TopDocs topDocs = topDocsList.getFirst();
149+
final int numShards = topDocsList.size();
149150
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
150151
return topDocs;
151-
} else if (topDocs instanceof TopFieldGroups firstTopDocs) {
152+
}
153+
final TopDocs mergedTopDocs;
154+
if (topDocs instanceof TopFieldGroups firstTopDocs) {
152155
final Sort sort = new Sort(firstTopDocs.fields);
153-
assert results.stream().noneMatch(topDoc -> topDoc == Lucene.EMPTY_TOP_DOCS);
154-
final TopFieldGroups[] shardTopDocs = results.toArray(TopFieldGroups[]::new);
156+
TopFieldGroups[] shardTopDocs = topDocsList.toArray(TopFieldGroups[]::new);
155157
mergedTopDocs = TopFieldGroups.merge(sort, from, topN, shardTopDocs, false);
156158
} else if (topDocs instanceof TopFieldDocs firstTopDocs) {
157-
final Sort sort = checkSameSortTypes(results, firstTopDocs.fields);
158-
final TopFieldDocs[] shardTopDocs = removeEmptyResults(results).toArray(TopFieldDocs[]::new);
159+
TopFieldDocs[] shardTopDocs = topDocsList.toArray(TopFieldDocs[]::new);
160+
final Sort sort = checkSameSortTypes(topDocsList, firstTopDocs.fields);
159161
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
160162
} else {
161-
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
163+
final TopDocs[] shardTopDocs = topDocsList.toArray(new TopDocs[numShards]);
162164
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
163165
}
164166
return mergedTopDocs;
165167
}
166168

167-
private static <T extends TopDocs> List<T> removeEmptyResults(List<T> results) {
168-
List<T> nonEmptyResults = new ArrayList<>();
169-
for (T result : results) {
170-
if (result.totalHits.value() > 0 || result.totalHits.relation() != Relation.EQUAL_TO) {
171-
nonEmptyResults.add(result);
172-
} else {
173-
assert result.scoreDocs.length == 0;
174-
}
175-
}
176-
return nonEmptyResults;
177-
}
178-
179169
private static Sort checkSameSortTypes(Collection<TopDocs> results, SortField[] firstSortFields) {
180170
Sort sort = new Sort(firstSortFields);
181171
if (results.size() < 2) return sort;

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
2929
import org.elasticsearch.common.io.stream.Writeable;
30-
import org.elasticsearch.common.lucene.Lucene;
3130
import org.elasticsearch.common.util.concurrent.CountDown;
3231
import org.elasticsearch.common.util.concurrent.EsExecutors;
3332
import org.elasticsearch.common.util.concurrent.ListenableFuture;
@@ -722,7 +721,7 @@ private static final class QueryPerNodeState {
722721

723722
private static final QueryPhaseResultConsumer.MergeResult EMPTY_PARTIAL_MERGE_RESULT = new QueryPhaseResultConsumer.MergeResult(
724723
List.of(),
725-
Lucene.EMPTY_TOP_DOCS,
724+
null,
726725
null,
727726
0L
728727
);
@@ -782,10 +781,12 @@ void onShardDone() {
782781
// also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other
783782
// indices without a roundtrip to the coordinating node
784783
final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size());
785-
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
786-
final int localIndex = scoreDoc.shardIndex;
787-
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
788-
relevantShardIndices.set(localIndex);
784+
if (mergeResult.reducedTopDocs() != null) {
785+
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
786+
final int localIndex = scoreDoc.shardIndex;
787+
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
788+
relevantShardIndices.set(localIndex);
789+
}
789790
}
790791
final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()];
791792
for (int i = 0; i < results.length; i++) {

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,9 @@ public static void writeTotalHits(StreamOutput out, TotalHits totalHits) throws
384384
* by shard for sorting purposes.
385385
*/
386386
public static void writeTopDocsIncludingShardIndex(StreamOutput out, TopDocs topDocs) throws IOException {
387-
if (topDocs instanceof TopFieldGroups topFieldGroups) {
387+
if (topDocs == null) {
388+
out.writeByte((byte) -1);
389+
} else if (topDocs instanceof TopFieldGroups topFieldGroups) {
388390
out.writeByte((byte) 2);
389391
writeTotalHits(out, topDocs.totalHits);
390392
out.writeString(topFieldGroups.field);
@@ -424,7 +426,9 @@ public static void writeSortFieldArray(StreamOutput out, SortField[] sortFields)
424426
*/
425427
public static TopDocs readTopDocsIncludingShardIndex(StreamInput in) throws IOException {
426428
byte type = in.readByte();
427-
if (type == 0) {
429+
if (type == -1) {
430+
return null;
431+
} else if (type == 0) {
428432
TotalHits totalHits = readTotalHits(in);
429433

430434
final int scoreDocCount = in.readVInt();

0 commit comments

Comments
 (0)