Skip to content

Commit 21563fa

Browse files
authored
Remove empty results before merging (#126770) (#127031)
We addressed the empty top docs issue with #126385 specifically for scenarios where empty top docs don't go through the wire. Yet they may be serialized from data node back to the coord node, in which case they will no longer be equal to Lucene#EMPTY_TOP_DOCS. This commit expands the existing filtering of empty top docs to include also those that did go through serialization. Closes #126742
1 parent 57e082e commit 21563fa

File tree

6 files changed

+48
-19
lines changed

6 files changed

+48
-19
lines changed

docs/changelog/126770.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126770
2+
summary: Remove empty results before merging
3+
area: Search
4+
type: bug
5+
issues:
6+
- 126742

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ static TransportVersion def(int id) {
206206
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS_8_19 = def(8_841_0_17);
207207
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG_8_19 = def(8_841_0_18);
208208
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION_BACKPORT_8_X = def(8_841_0_19);
209+
public static final TransportVersion SEARCH_INCREMENTAL_TOP_DOCS_NULL_BACKPORT_8_19 = def(8_841_0_20);
209210

210211
/*
211212
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,19 @@ private static void consumePartialMergeResult(
303303
Collection<InternalAggregations> aggsList
304304
) {
305305
if (topDocsList != null) {
306-
topDocsList.add(partialResult.reducedTopDocs);
306+
addTopDocsToList(partialResult, topDocsList);
307307
}
308308
if (aggsList != null) {
309309
addAggsToList(partialResult, aggsList);
310310
}
311311
}
312312

313+
private static void addTopDocsToList(MergeResult partialResult, List<TopDocs> topDocsList) {
314+
if (partialResult.reducedTopDocs != null) {
315+
topDocsList.add(partialResult.reducedTopDocs);
316+
}
317+
}
318+
313319
private static void addAggsToList(MergeResult partialResult, Collection<InternalAggregations> aggsList) {
314320
var aggs = partialResult.reducedAggs;
315321
if (aggs != null) {
@@ -340,7 +346,7 @@ private MergeResult partialReduce(
340346
if (hasTopDocs) {
341347
topDocsList = new ArrayList<>(resultSetSize);
342348
if (lastMerge != null) {
343-
topDocsList.add(lastMerge.reducedTopDocs);
349+
addTopDocsToList(lastMerge, topDocsList);
344350
}
345351
} else {
346352
topDocsList = null;
@@ -358,7 +364,7 @@ private MergeResult partialReduce(
358364
}
359365
}
360366
// we have to merge here in the same way we collect on a shard
361-
newTopDocs = topDocsList == null ? Lucene.EMPTY_TOP_DOCS : mergeTopDocs(topDocsList, topNSize, 0);
367+
newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0);
362368
newAggs = hasAggs
363369
? aggregate(
364370
toConsume.iterator(),
@@ -647,7 +653,7 @@ private static void releaseAggs(List<QuerySearchResult> toConsume) {
647653

648654
record MergeResult(
649655
List<SearchShard> processedShards,
650-
TopDocs reducedTopDocs,
656+
@Nullable TopDocs reducedTopDocs,
651657
@Nullable InternalAggregations reducedAggs,
652658
long estimatedSize
653659
) implements Writeable {

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.HashMap;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.Objects;
6364
import java.util.concurrent.Executor;
6465
import java.util.function.BiFunction;
6566
import java.util.function.Consumer;
@@ -140,24 +141,26 @@ static SortedTopDocs sortDocs(
140141
}
141142

142143
static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
143-
if (results.isEmpty()) {
144+
List<TopDocs> topDocsList = results.stream().filter(Objects::nonNull).toList();
145+
if (topDocsList.isEmpty()) {
144146
return null;
145147
}
146-
final TopDocs topDocs = results.stream().findFirst().get();
147-
final TopDocs mergedTopDocs;
148+
final TopDocs topDocs = topDocsList.stream().findFirst().get();
148149
final int numShards = results.size();
149150
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
150151
return topDocs;
151-
} else if (topDocs instanceof TopFieldGroups firstTopDocs) {
152+
}
153+
final TopDocs mergedTopDocs;
154+
if (topDocs instanceof TopFieldGroups firstTopDocs) {
152155
final Sort sort = new Sort(firstTopDocs.fields);
153-
final TopFieldGroups[] shardTopDocs = results.stream().filter(td -> td != Lucene.EMPTY_TOP_DOCS).toArray(TopFieldGroups[]::new);
156+
TopFieldGroups[] shardTopDocs = topDocsList.toArray(new TopFieldGroups[0]);
154157
mergedTopDocs = TopFieldGroups.merge(sort, from, topN, shardTopDocs, false);
155158
} else if (topDocs instanceof TopFieldDocs firstTopDocs) {
156-
final Sort sort = checkSameSortTypes(results, firstTopDocs.fields);
157-
final TopFieldDocs[] shardTopDocs = results.stream().filter((td -> td != Lucene.EMPTY_TOP_DOCS)).toArray(TopFieldDocs[]::new);
159+
TopFieldDocs[] shardTopDocs = topDocsList.toArray(new TopFieldDocs[0]);
160+
final Sort sort = checkSameSortTypes(topDocsList, firstTopDocs.fields);
158161
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
159162
} else {
160-
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
163+
final TopDocs[] shardTopDocs = topDocsList.toArray(new TopDocs[0]);
161164
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
162165
}
163166
return mergedTopDocs;

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
2929
import org.elasticsearch.common.io.stream.Writeable;
30-
import org.elasticsearch.common.lucene.Lucene;
3130
import org.elasticsearch.common.util.concurrent.CountDown;
3231
import org.elasticsearch.common.util.concurrent.EsExecutors;
3332
import org.elasticsearch.common.util.concurrent.ListenableFuture;
@@ -720,7 +719,7 @@ private static final class QueryPerNodeState {
720719

721720
private static final QueryPhaseResultConsumer.MergeResult EMPTY_PARTIAL_MERGE_RESULT = new QueryPhaseResultConsumer.MergeResult(
722721
List.of(),
723-
Lucene.EMPTY_TOP_DOCS,
722+
null,
724723
null,
725724
0L
726725
);
@@ -780,10 +779,12 @@ void onShardDone() {
780779
// also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other
781780
// indices without a roundtrip to the coordinating node
782781
final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size());
783-
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
784-
final int localIndex = scoreDoc.shardIndex;
785-
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
786-
relevantShardIndices.set(localIndex);
782+
if (mergeResult.reducedTopDocs() != null) {
783+
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
784+
final int localIndex = scoreDoc.shardIndex;
785+
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
786+
relevantShardIndices.set(localIndex);
787+
}
787788
}
788789
final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()];
789790
for (int i = 0; i < results.length; i++) {

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.lucene.util.BytesRef;
6565
import org.apache.lucene.util.Version;
6666
import org.elasticsearch.ExceptionsHelper;
67+
import org.elasticsearch.TransportVersions;
6768
import org.elasticsearch.common.Strings;
6869
import org.elasticsearch.common.io.stream.StreamInput;
6970
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -383,6 +384,14 @@ public static void writeTotalHits(StreamOutput out, TotalHits totalHits) throws
383384
* by shard for sorting purposes.
384385
*/
385386
public static void writeTopDocsIncludingShardIndex(StreamOutput out, TopDocs topDocs) throws IOException {
387+
if (topDocs == null) {
388+
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL_BACKPORT_8_19)) {
389+
out.writeByte((byte) -1);
390+
return;
391+
} else {
392+
topDocs = Lucene.EMPTY_TOP_DOCS;
393+
}
394+
}
386395
if (topDocs instanceof TopFieldGroups topFieldGroups) {
387396
out.writeByte((byte) 2);
388397
writeTotalHits(out, topDocs.totalHits);
@@ -423,7 +432,10 @@ public static void writeSortFieldArray(StreamOutput out, SortField[] sortFields)
423432
*/
424433
public static TopDocs readTopDocsIncludingShardIndex(StreamInput in) throws IOException {
425434
byte type = in.readByte();
426-
if (type == 0) {
435+
if (type == -1) {
436+
assert in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL_BACKPORT_8_19);
437+
return null;
438+
} else if (type == 0) {
427439
TotalHits totalHits = readTotalHits(in);
428440

429441
final int scoreDocCount = in.readVInt();

0 commit comments

Comments
 (0)