Skip to content

Commit 0873098

Browse files
committed
Add subsidiary failures to search responses
1 parent a895875 commit 0873098

File tree

29 files changed

+338
-98
lines changed

29 files changed

+338
-98
lines changed

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.action.search.ShardSearchFailure;
15+
import org.elasticsearch.action.search.SubsidiaryFailure;
1516
import org.elasticsearch.action.support.ActionFilters;
1617
import org.elasticsearch.action.support.HandledTransportAction;
17-
import org.elasticsearch.common.io.stream.Writeable;
1818
import org.elasticsearch.common.util.concurrent.EsExecutors;
1919
import org.elasticsearch.injection.guice.Inject;
2020
import org.elasticsearch.plugin.noop.NoopPlugin;
@@ -34,7 +34,7 @@ public TransportNoopSearchAction(TransportService transportService, ActionFilter
3434
NoopPlugin.NOOP_SEARCH_ACTION.name(),
3535
transportService,
3636
actionFilters,
37-
(Writeable.Reader<SearchRequest>) SearchRequest::new,
37+
SearchRequest::new,
3838
EsExecutors.DIRECT_EXECUTOR_SERVICE
3939
);
4040
}
@@ -56,6 +56,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
5656
0,
5757
0,
5858
ShardSearchFailure.EMPTY_ARRAY,
59+
SubsidiaryFailure.EMPTY_ARRAY,
5960
SearchResponse.Clusters.EMPTY
6061
)
6162
);

docs/changelog/122788.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122788
2+
summary: Add subsidiary failures to search responses
3+
area: Ranking
4+
type: enhancement
5+
issues:
6+
- 116796

modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/SearchTemplateResponseTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.search.TotalHits;
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.action.search.ShardSearchFailure;
15+
import org.elasticsearch.action.search.SubsidiaryFailure;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.xcontent.XContentHelper;
1718
import org.elasticsearch.search.SearchHit;
@@ -201,6 +202,7 @@ public void testSearchResponseToXContent() throws IOException {
201202
0,
202203
0,
203204
ShardSearchFailure.EMPTY_ARRAY,
205+
SubsidiaryFailure.EMPTY_ARRAY,
204206
SearchResponse.Clusters.EMPTY
205207
);
206208

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ static TransportVersion def(int id) {
201201
public static final TransportVersion REMOVE_REPOSITORY_CONFLICT_MESSAGE = def(9_012_0_00);
202202
public static final TransportVersion RERANKER_FAILURES_ALLOWED = def(9_013_0_00);
203203
public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED = def(9_014_0_00);
204+
public static final TransportVersion SEARCH_SUBSIDIARY_FAILURES = def(9_015_0_00);
205+
204206
/*
205207
* STOP! READ THIS FIRST! No, really,
206208
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.ArrayList;
4545
import java.util.Arrays;
46+
import java.util.Collections;
4647
import java.util.List;
4748
import java.util.Map;
4849
import java.util.concurrent.ConcurrentHashMap;
@@ -91,6 +92,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9192
private final AtomicInteger successfulOps = new AtomicInteger();
9293
private final SearchTimeProvider timeProvider;
9394
private final SearchResponse.Clusters clusters;
95+
private final List<SubsidiaryFailure> subsidiaryFailures = Collections.synchronizedList(new ArrayList<>());
9496

9597
protected final List<SearchShardIterator> toSkipShardsIts;
9698
protected final List<SearchShardIterator> shardsIts;
@@ -573,6 +575,7 @@ public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
573575
private SearchResponse buildSearchResponse(
574576
SearchResponseSections internalSearchResponse,
575577
ShardSearchFailure[] failures,
578+
SubsidiaryFailure[] subsidiaryFailures,
576579
String scrollId,
577580
BytesReference searchContextId
578581
) {
@@ -588,6 +591,7 @@ private SearchResponse buildSearchResponse(
588591
toSkipShardsIts.size(),
589592
buildTookInMillis(),
590593
failures,
594+
subsidiaryFailures,
591595
clusters,
592596
searchContextId
593597
);
@@ -623,7 +627,13 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
623627
searchContextId = null;
624628
}
625629
}
626-
ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
630+
631+
SubsidiaryFailure[] subFailures = subsidiaryFailures.toArray(SubsidiaryFailure.EMPTY_ARRAY);
632+
633+
ActionListener.respondAndRelease(
634+
listener,
635+
buildSearchResponse(internalSearchResponse, failures, subFailures, scrollId, searchContextId)
636+
);
627637
}
628638
}
629639

@@ -661,6 +671,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
661671
listener.onFailure(exception);
662672
}
663673

674+
public void addSubsidiaryFailure(String description, Exception exception) {
675+
subsidiaryFailures.add(new SubsidiaryFailure(description, exception));
676+
}
677+
664678
/**
665679
* Releases a search context with the given context ID on the node the given connection is connected to.
666680
* @see org.elasticsearch.search.query.QuerySearchResult#getContextId()

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,10 @@ public void onResponse(RankFeatureDoc[] docsWithUpdatedScores) {
199199
@Override
200200
public void onFailure(Exception e) {
201201
if (rankFeaturePhaseRankCoordinatorContext.failuresAllowed()) {
202-
// TODO: handle the exception somewhere
203202
// don't want to log the entire stack trace, it's not helpful here
204203
logger.warn("Exception computing updated ranks, continuing with existing ranks: {}", e.toString());
204+
context.addSubsidiaryFailure("reranking", e);
205+
205206
// use the existing score docs as-is
206207
// downstream things expect every doc to have a score, so we need to infer a score here
207208
// if the doc doesn't otherwise have one. We can use the rank to infer a possible score instead (1/rank).

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.io.stream.StreamOutput;
2020
import org.elasticsearch.common.io.stream.Writeable;
2121
import org.elasticsearch.common.lucene.Lucene;
22+
import org.elasticsearch.common.util.CollectionUtils;
2223
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2324
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
2425
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
@@ -53,8 +54,6 @@
5354
import java.util.function.Predicate;
5455
import java.util.function.Supplier;
5556

56-
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
57-
5857
/**
5958
* A response of a search request.
6059
*/
@@ -70,6 +69,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
7069
public static final ParseField TIMED_OUT = new ParseField("timed_out");
7170
public static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
7271
public static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
72+
public static final ParseField SUBSIDIARY_FAILURES = new ParseField("subsidiaryFailures");
7373

7474
private final SearchHits hits;
7575
private final InternalAggregations aggregations;
@@ -84,6 +84,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
8484
private final int successfulShards;
8585
private final int skippedShards;
8686
private final ShardSearchFailure[] shardFailures;
87+
private final SubsidiaryFailure[] subsidiaryFailures;
8788
private final Clusters clusters;
8889
private final long tookInMillis;
8990

@@ -100,14 +101,17 @@ public SearchResponse(StreamInput in) throws IOException {
100101
this.numReducePhases = in.readVInt();
101102
totalShards = in.readVInt();
102103
successfulShards = in.readVInt();
103-
int size = in.readVInt();
104-
if (size == 0) {
105-
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
104+
shardFailures = in.readArray(
105+
ShardSearchFailure::readShardSearchFailure,
106+
s -> s == 0 ? ShardSearchFailure.EMPTY_ARRAY : new ShardSearchFailure[s]
107+
);
108+
if (in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
109+
subsidiaryFailures = in.readArray(
110+
SubsidiaryFailure::new,
111+
s -> s == 0 ? SubsidiaryFailure.EMPTY_ARRAY : new SubsidiaryFailure[s]
112+
);
106113
} else {
107-
shardFailures = new ShardSearchFailure[size];
108-
for (int i = 0; i < shardFailures.length; i++) {
109-
shardFailures[i] = readShardSearchFailure(in);
110-
}
114+
subsidiaryFailures = SubsidiaryFailure.EMPTY_ARRAY;
111115
}
112116
clusters = new Clusters(in);
113117
scrollId = in.readOptionalString();
@@ -130,6 +134,7 @@ public SearchResponse(
130134
int skippedShards,
131135
long tookInMillis,
132136
ShardSearchFailure[] shardFailures,
137+
SubsidiaryFailure[] subsidiaryFailures,
133138
Clusters clusters
134139
) {
135140
this(
@@ -146,6 +151,7 @@ public SearchResponse(
146151
skippedShards,
147152
tookInMillis,
148153
shardFailures,
154+
subsidiaryFailures,
149155
clusters,
150156
null
151157
);
@@ -159,6 +165,7 @@ public SearchResponse(
159165
int skippedShards,
160166
long tookInMillis,
161167
ShardSearchFailure[] shardFailures,
168+
SubsidiaryFailure[] subsidiaryFailures,
162169
Clusters clusters,
163170
BytesReference pointInTimeId
164171
) {
@@ -176,6 +183,7 @@ public SearchResponse(
176183
skippedShards,
177184
tookInMillis,
178185
shardFailures,
186+
subsidiaryFailures,
179187
clusters,
180188
pointInTimeId
181189
);
@@ -195,6 +203,7 @@ public SearchResponse(
195203
int skippedShards,
196204
long tookInMillis,
197205
ShardSearchFailure[] shardFailures,
206+
SubsidiaryFailure[] subsidiaryFailures,
198207
Clusters clusters,
199208
BytesReference pointInTimeId
200209
) {
@@ -214,6 +223,7 @@ public SearchResponse(
214223
this.skippedShards = skippedShards;
215224
this.tookInMillis = tookInMillis;
216225
this.shardFailures = shardFailures;
226+
this.subsidiaryFailures = subsidiaryFailures;
217227
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
218228
assert scrollId == null || pointInTimeId == null
219229
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
@@ -339,7 +349,14 @@ public int getFailedShards() {
339349
* The failures that occurred during the search.
340350
*/
341351
public ShardSearchFailure[] getShardFailures() {
342-
return this.shardFailures;
352+
return shardFailures;
353+
}
354+
355+
/**
356+
* The failures that occurred that did not stop results from being returned.
357+
*/
358+
public SubsidiaryFailure[] getSubsidiaryFailures() {
359+
return subsidiaryFailures;
343360
}
344361

345362
/**
@@ -392,6 +409,9 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
392409
public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params params) {
393410
return Iterators.concat(
394411
ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent),
412+
CollectionUtils.isEmpty(subsidiaryFailures)
413+
? Collections.emptyIterator()
414+
: ChunkedToXContentHelper.array(SUBSIDIARY_FAILURES.getPreferredName(), Iterators.forArray(subsidiaryFailures)),
395415
Iterators.single(clusters),
396416
Iterators.concat(
397417
hits.toXContentChunked(params),
@@ -444,10 +464,9 @@ public void writeTo(StreamOutput out) throws IOException {
444464
out.writeVInt(numReducePhases);
445465
out.writeVInt(totalShards);
446466
out.writeVInt(successfulShards);
447-
448-
out.writeVInt(shardFailures.length);
449-
for (ShardSearchFailure shardSearchFailure : shardFailures) {
450-
shardSearchFailure.writeTo(out);
467+
out.writeArray(shardFailures);
468+
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
469+
out.writeArray(subsidiaryFailures);
451470
}
452471
clusters.writeTo(out);
453472
out.writeOptionalString(scrollId);
@@ -1162,6 +1181,7 @@ public static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters
11621181
0,
11631182
tookInMillisSupplier.get(),
11641183
ShardSearchFailure.EMPTY_ARRAY,
1184+
SubsidiaryFailure.EMPTY_ARRAY,
11651185
clusters,
11661186
null
11671187
);

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
129129
// the current reduce phase counts as one
130130
int numReducePhases = 1;
131131
List<ShardSearchFailure> failures = new ArrayList<>();
132+
List<SubsidiaryFailure> subsidiaryFailures = new ArrayList<>();
132133
Map<String, SearchProfileShardResult> profileResults = new HashMap<>();
133134
List<InternalAggregations> aggs = new ArrayList<>();
134135
Map<ShardIdAndClusterAlias, Integer> shards = new TreeMap<>();
@@ -145,6 +146,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
145146
numReducePhases += searchResponse.getNumReducePhases();
146147

147148
Collections.addAll(failures, searchResponse.getShardFailures());
149+
Collections.addAll(subsidiaryFailures, searchResponse.getSubsidiaryFailures());
148150

149151
profileResults.putAll(searchResponse.getProfileResults());
150152

@@ -213,6 +215,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
213215
? InternalAggregations.EMPTY
214216
: InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
215217
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
218+
SubsidiaryFailure[] subFailures = subsidiaryFailures.toArray(SubsidiaryFailure.EMPTY_ARRAY);
216219
SearchProfileResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileResults(profileResults);
217220
// make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
218221
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
@@ -231,6 +234,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
231234
skippedShards,
232235
tookInMillis,
233236
shardFailures,
237+
subFailures,
234238
clusters,
235239
null
236240
);

server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> {
5555
protected final SearchTransportService searchTransportService;
5656
private final long startTime;
5757
private final List<ShardSearchFailure> shardFailures = new ArrayList<>();
58+
private final List<SubsidiaryFailure> subsidiaryFailures = new ArrayList<>();
5859
private final AtomicInteger successfulOps;
5960

6061
protected SearchScrollAsyncAction(
@@ -213,6 +214,10 @@ synchronized ShardSearchFailure[] buildShardFailures() { // pkg private for test
213214
return shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
214215
}
215216

217+
synchronized SubsidiaryFailure[] buildSubsidiaryFailures() {
218+
return subsidiaryFailures.toArray(SubsidiaryFailure.EMPTY_ARRAY);
219+
}
220+
216221
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
217222
// we simply try and return as much as possible
218223
private synchronized void addShardFailure(ShardSearchFailure failure) {
@@ -263,6 +268,7 @@ protected final void sendResponse(
263268
0,
264269
buildTookInMillis(),
265270
buildShardFailures(),
271+
buildSubsidiaryFailures(),
266272
SearchResponse.Clusters.EMPTY,
267273
null
268274
)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.xcontent.ToXContentObject;
17+
import org.elasticsearch.xcontent.XContentBuilder;
18+
19+
import java.io.IOException;
20+
21+
public record SubsidiaryFailure(String phase, Exception failure) implements ToXContentObject, Writeable {
22+
public static final String PHASE_FIELD = "phase";
23+
public static final String FAILURE_FIELD = "failure";
24+
25+
public static final SubsidiaryFailure[] EMPTY_ARRAY = new SubsidiaryFailure[0];
26+
27+
public SubsidiaryFailure(StreamInput in) throws IOException {
28+
this(in.readString(), in.readException());
29+
}
30+
31+
@Override
32+
public void writeTo(StreamOutput out) throws IOException {
33+
out.writeString(phase);
34+
out.writeException(failure);
35+
}
36+
37+
@Override
38+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
39+
builder.startObject();
40+
builder.field(PHASE_FIELD, phase);
41+
builder.startObject(FAILURE_FIELD);
42+
ElasticsearchException.generateThrowableXContent(builder, params, failure);
43+
builder.endObject();
44+
builder.endObject();
45+
return builder;
46+
}
47+
}

0 commit comments

Comments
 (0)