Skip to content

Commit 9024e22

Browse files
committed
Add subsidiary failures to search responses
1 parent fc2f8fc commit 9024e22

File tree

59 files changed

+288
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+288
-24
lines changed

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.action.search.ShardSearchFailure;
15+
import org.elasticsearch.action.search.SubsidiaryFailure;
1516
import org.elasticsearch.action.support.ActionFilters;
1617
import org.elasticsearch.action.support.HandledTransportAction;
1718
import org.elasticsearch.common.io.stream.Writeable;
@@ -56,6 +57,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
5657
0,
5758
0,
5859
ShardSearchFailure.EMPTY_ARRAY,
60+
SubsidiaryFailure.EMPTY_ARRAY,
5961
SearchResponse.Clusters.EMPTY
6062
)
6163
);

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
341341
}
342342

343343
SearchHits hits = SearchHits.unpooled(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f);
344-
SearchResponse searchResponse = new SearchResponse(hits, null, null, false, null, null, 0, null, 1, 1, 0, 1L, null, null);
344+
SearchResponse searchResponse = new SearchResponse(hits, null, null, false, null, null, 0, null, 1, 1, 0, 1L, null, null, null);
345345
toRelease.add(searchResponse::decRef);
346346
@SuppressWarnings("unchecked")
347347
ActionFuture<SearchResponse> actionFuture = mock(ActionFuture.class);

modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/SearchTemplateResponseTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.search.TotalHits;
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.action.search.ShardSearchFailure;
15+
import org.elasticsearch.action.search.SubsidiaryFailure;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.xcontent.XContentHelper;
1718
import org.elasticsearch.search.SearchHit;
@@ -201,6 +202,7 @@ public void testSearchResponseToXContent() throws IOException {
201202
0,
202203
0,
203204
ShardSearchFailure.EMPTY_ARRAY,
205+
SubsidiaryFailure.EMPTY_ARRAY,
204206
SearchResponse.Clusters.EMPTY
205207
);
206208

modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ protected RequestWrapper<?> buildRequest(Hit doc) {
588588
0,
589589
randomLong(),
590590
null,
591+
null,
591592
SearchResponse.Clusters.EMPTY
592593
);
593594
try {

modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ private SearchResponse createSearchResponse() {
180180
0,
181181
randomLong(),
182182
null,
183+
null,
183184
SearchResponse.Clusters.EMPTY
184185
);
185186
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ static TransportVersion def(int id) {
198198
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);
199199
public static final TransportVersion REMOVE_REPOSITORY_CONFLICT_MESSAGE = def(9_012_0_00);
200200
public static final TransportVersion RERANKER_FAILURES_ALLOWED = def(9_013_0_00);
201+
public static final TransportVersion SEARCH_SUBSIDIARY_FAILURES = def(9_014_0_00);
201202

202203
/*
203204
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.ArrayList;
4545
import java.util.Arrays;
46+
import java.util.Collections;
4647
import java.util.List;
4748
import java.util.Map;
4849
import java.util.concurrent.ConcurrentHashMap;
@@ -91,6 +92,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9192
private final AtomicInteger successfulOps = new AtomicInteger();
9293
private final SearchTimeProvider timeProvider;
9394
private final SearchResponse.Clusters clusters;
95+
private final List<SubsidiaryFailure> subsidiaryFailures = Collections.synchronizedList(new ArrayList<>());
9496

9597
protected final List<SearchShardIterator> toSkipShardsIts;
9698
protected final List<SearchShardIterator> shardsIts;
@@ -573,6 +575,7 @@ public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
573575
private SearchResponse buildSearchResponse(
574576
SearchResponseSections internalSearchResponse,
575577
ShardSearchFailure[] failures,
578+
SubsidiaryFailure[] subsidiaryFailures,
576579
String scrollId,
577580
BytesReference searchContextId
578581
) {
@@ -588,6 +591,7 @@ private SearchResponse buildSearchResponse(
588591
toSkipShardsIts.size(),
589592
buildTookInMillis(),
590593
failures,
594+
subsidiaryFailures,
591595
clusters,
592596
searchContextId
593597
);
@@ -623,7 +627,13 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
623627
searchContextId = null;
624628
}
625629
}
626-
ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
630+
631+
SubsidiaryFailure[] subFailures = subsidiaryFailures.toArray(SubsidiaryFailure.EMPTY_ARRAY);
632+
633+
ActionListener.respondAndRelease(
634+
listener,
635+
buildSearchResponse(internalSearchResponse, failures, subFailures, scrollId, searchContextId)
636+
);
627637
}
628638
}
629639

@@ -661,6 +671,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
661671
listener.onFailure(exception);
662672
}
663673

674+
public void addSubsidiaryFailure(String description, Exception exception) {
675+
subsidiaryFailures.add(new SubsidiaryFailure(description, exception));
676+
}
677+
664678
/**
665679
* Releases a search context with the given context ID on the node the given connection is connected to.
666680
* @see org.elasticsearch.search.query.QuerySearchResult#getContextId()

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,10 @@ public void onResponse(RankFeatureDoc[] docsWithUpdatedScores) {
199199
@Override
200200
public void onFailure(Exception e) {
201201
if (rankFeaturePhaseRankCoordinatorContext.failuresAllowed()) {
202-
// TODO: handle the exception somewhere
203202
// don't want to log the entire stack trace, it's not helpful here
204203
logger.warn("Exception computing updated ranks, continuing with existing ranks: {}", e.toString());
204+
context.addSubsidiaryFailure("reranking", e);
205+
205206
// use the existing score docs as-is
206207
// downstream things expect every doc to have a score, so we need to infer a score here
207208
// if the doc doesn't otherwise have one. We can use the rank to infer a possible score instead (1/rank).

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@
5353
import java.util.function.Predicate;
5454
import java.util.function.Supplier;
5555

56-
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
57-
5856
/**
5957
* A response of a search request.
6058
*/
@@ -70,6 +68,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
7068
public static final ParseField TIMED_OUT = new ParseField("timed_out");
7169
public static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
7270
public static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
71+
public static final ParseField SUBSIDIARY_FAILURES = new ParseField("subsidiaryFailures");
7372

7473
private final SearchHits hits;
7574
private final InternalAggregations aggregations;
@@ -84,6 +83,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
8483
private final int successfulShards;
8584
private final int skippedShards;
8685
private final ShardSearchFailure[] shardFailures;
86+
private final SubsidiaryFailure[] subsidiaryFailures;
8787
private final Clusters clusters;
8888
private final long tookInMillis;
8989

@@ -100,14 +100,17 @@ public SearchResponse(StreamInput in) throws IOException {
100100
this.numReducePhases = in.readVInt();
101101
totalShards = in.readVInt();
102102
successfulShards = in.readVInt();
103-
int size = in.readVInt();
104-
if (size == 0) {
105-
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
103+
shardFailures = in.readArray(
104+
ShardSearchFailure::readShardSearchFailure,
105+
s -> s == 0 ? ShardSearchFailure.EMPTY_ARRAY : new ShardSearchFailure[s]
106+
);
107+
if (in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
108+
subsidiaryFailures = in.readArray(
109+
SubsidiaryFailure::new,
110+
s -> s == 0 ? SubsidiaryFailure.EMPTY_ARRAY : new SubsidiaryFailure[s]
111+
);
106112
} else {
107-
shardFailures = new ShardSearchFailure[size];
108-
for (int i = 0; i < shardFailures.length; i++) {
109-
shardFailures[i] = readShardSearchFailure(in);
110-
}
113+
subsidiaryFailures = SubsidiaryFailure.EMPTY_ARRAY;
111114
}
112115
clusters = new Clusters(in);
113116
scrollId = in.readOptionalString();
@@ -130,6 +133,7 @@ public SearchResponse(
130133
int skippedShards,
131134
long tookInMillis,
132135
ShardSearchFailure[] shardFailures,
136+
SubsidiaryFailure[] subsidiaryFailures,
133137
Clusters clusters
134138
) {
135139
this(
@@ -146,6 +150,7 @@ public SearchResponse(
146150
skippedShards,
147151
tookInMillis,
148152
shardFailures,
153+
subsidiaryFailures,
149154
clusters,
150155
null
151156
);
@@ -159,6 +164,7 @@ public SearchResponse(
159164
int skippedShards,
160165
long tookInMillis,
161166
ShardSearchFailure[] shardFailures,
167+
SubsidiaryFailure[] subsidiaryFailures,
162168
Clusters clusters,
163169
BytesReference pointInTimeId
164170
) {
@@ -176,6 +182,7 @@ public SearchResponse(
176182
skippedShards,
177183
tookInMillis,
178184
shardFailures,
185+
subsidiaryFailures,
179186
clusters,
180187
pointInTimeId
181188
);
@@ -195,6 +202,7 @@ public SearchResponse(
195202
int skippedShards,
196203
long tookInMillis,
197204
ShardSearchFailure[] shardFailures,
205+
SubsidiaryFailure[] subsidiaryFailures,
198206
Clusters clusters,
199207
BytesReference pointInTimeId
200208
) {
@@ -214,6 +222,7 @@ public SearchResponse(
214222
this.skippedShards = skippedShards;
215223
this.tookInMillis = tookInMillis;
216224
this.shardFailures = shardFailures;
225+
this.subsidiaryFailures = subsidiaryFailures;
217226
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
218227
assert scrollId == null || pointInTimeId == null
219228
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
@@ -339,7 +348,14 @@ public int getFailedShards() {
339348
* The failures that occurred during the search.
340349
*/
341350
public ShardSearchFailure[] getShardFailures() {
342-
return this.shardFailures;
351+
return shardFailures;
352+
}
353+
354+
/**
355+
* The failures that occurred that did not stop results from being returned.
356+
*/
357+
public SubsidiaryFailure[] getSubsidiaryFailures() {
358+
return subsidiaryFailures;
343359
}
344360

345361
/**
@@ -392,6 +408,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
392408
public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params params) {
393409
return Iterators.concat(
394410
ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent),
411+
ChunkedToXContentHelper.array(SUBSIDIARY_FAILURES.getPreferredName(), Iterators.forArray(subsidiaryFailures)),
395412
Iterators.single(clusters),
396413
Iterators.concat(
397414
hits.toXContentChunked(params),
@@ -444,10 +461,9 @@ public void writeTo(StreamOutput out) throws IOException {
444461
out.writeVInt(numReducePhases);
445462
out.writeVInt(totalShards);
446463
out.writeVInt(successfulShards);
447-
448-
out.writeVInt(shardFailures.length);
449-
for (ShardSearchFailure shardSearchFailure : shardFailures) {
450-
shardSearchFailure.writeTo(out);
464+
out.writeArray(shardFailures);
465+
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
466+
out.writeArray(subsidiaryFailures);
451467
}
452468
clusters.writeTo(out);
453469
out.writeOptionalString(scrollId);
@@ -1162,6 +1178,7 @@ public static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters
11621178
0,
11631179
tookInMillisSupplier.get(),
11641180
ShardSearchFailure.EMPTY_ARRAY,
1181+
SubsidiaryFailure.EMPTY_ARRAY,
11651182
clusters,
11661183
null
11671184
);

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
129129
// the current reduce phase counts as one
130130
int numReducePhases = 1;
131131
List<ShardSearchFailure> failures = new ArrayList<>();
132+
List<SubsidiaryFailure> subsidiaryFailures = new ArrayList<>();
132133
Map<String, SearchProfileShardResult> profileResults = new HashMap<>();
133134
List<InternalAggregations> aggs = new ArrayList<>();
134135
Map<ShardIdAndClusterAlias, Integer> shards = new TreeMap<>();
@@ -145,6 +146,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
145146
numReducePhases += searchResponse.getNumReducePhases();
146147

147148
Collections.addAll(failures, searchResponse.getShardFailures());
149+
Collections.addAll(subsidiaryFailures, searchResponse.getSubsidiaryFailures());
148150

149151
profileResults.putAll(searchResponse.getProfileResults());
150152

@@ -213,6 +215,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
213215
? InternalAggregations.EMPTY
214216
: InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
215217
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
218+
SubsidiaryFailure[] subFailures = subsidiaryFailures.toArray(SubsidiaryFailure.EMPTY_ARRAY);
216219
SearchProfileResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileResults(profileResults);
217220
// make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
218221
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
@@ -231,6 +234,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
231234
skippedShards,
232235
tookInMillis,
233236
shardFailures,
237+
subFailures,
234238
clusters,
235239
null
236240
);

0 commit comments

Comments
 (0)