Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
package org.elasticsearch.plugin.noop.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.PhaseFailure;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.plugin.noop.NoopPlugin;
Expand All @@ -34,7 +34,7 @@ public TransportNoopSearchAction(TransportService transportService, ActionFilter
NoopPlugin.NOOP_SEARCH_ACTION.name(),
transportService,
actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new,
SearchRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
Expand All @@ -56,6 +56,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
PhaseFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
)
);
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/122788.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 122788
summary: Add phase failures to search responses
area: Ranking
type: enhancement
issues:
- 116796
5 changes: 5 additions & 0 deletions docs/reference/elasticsearch/rest-apis/retrievers.md
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,11 @@ score = ln(score), if score < 0

Applies the specified [boolean query filter](/reference/query-languages/query-dsl-bool-query.md) to the child `retriever`. If the child retriever already specifies any filters, then this top-level filter is applied in conjuction with the filter defined in the child retriever.

`allow_rerank_failures`
: (Optional, `boolean`)

If `true`, a failure during reranking will not fail the whole query. Instead, the documents from the preceding steps are passed-through unmodified to subsequent retriever steps. The failure exception will be available in the `phase_failures` field of the search response.
Defaults to `false`.


### Example: Elastic Rerank [text-similarity-reranker-retriever-example-elastic-rerank]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.script.mustache;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.PhaseFailure;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -201,6 +202,7 @@ public void testSearchResponseToXContent() throws IOException {
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
PhaseFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
public static final TransportVersion SEARCH_SUBSIDIARY_FAILURES = def(9_026_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -90,6 +91,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;
private final List<PhaseFailure> phaseFailures = Collections.synchronizedList(new ArrayList<>());

protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
Expand Down Expand Up @@ -573,6 +575,7 @@ public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
private SearchResponse buildSearchResponse(
SearchResponseSections internalSearchResponse,
ShardSearchFailure[] failures,
PhaseFailure[] phaseFailures,
String scrollId,
BytesReference searchContextId
) {
Expand All @@ -588,6 +591,7 @@ private SearchResponse buildSearchResponse(
skippedCount,
buildTookInMillis(),
failures,
phaseFailures,
clusters,
searchContextId
);
Expand Down Expand Up @@ -623,7 +627,13 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
searchContextId = null;
}
}
ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));

PhaseFailure[] subFailures = phaseFailures.toArray(PhaseFailure.EMPTY_ARRAY);

ActionListener.respondAndRelease(
listener,
buildSearchResponse(internalSearchResponse, failures, subFailures, scrollId, searchContextId)
);
}
}

Expand Down Expand Up @@ -661,6 +671,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
listener.onFailure(exception);
}

public void addPhaseFailure(String phase, Exception exception) {
phaseFailures.add(new PhaseFailure(phase, exception));
}

/**
* Releases a search context with the given context ID on the node the given connection is connected to.
* @see org.elasticsearch.search.query.QuerySearchResult#getContextId()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

public record PhaseFailure(String phase, Exception failure) implements ToXContentObject, Writeable {
public static final String PHASE_FIELD = "phase";
public static final String FAILURE_FIELD = "failure";

public static final PhaseFailure[] EMPTY_ARRAY = new PhaseFailure[0];

public PhaseFailure(StreamInput in) throws IOException {
this(in.readString(), in.readException());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
out.writeException(failure);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PHASE_FIELD, phase);
builder.startObject(FAILURE_FIELD);
ElasticsearchException.generateThrowableXContent(builder, params, failure);
builder.endObject();
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ public void onResponse(RankFeatureDoc[] docsWithUpdatedScores) {
@Override
public void onFailure(Exception e) {
if (rankFeaturePhaseRankCoordinatorContext.failuresAllowed()) {
// TODO: handle the exception somewhere
// don't want to log the entire stack trace, it's not helpful here
logger.warn("Exception computing updated ranks, continuing with existing ranks: {}", e.toString());
context.addPhaseFailure("reranking", e);

// use the existing score docs as-is
// downstream things expect every doc to have a score, so we need to infer a score here
// if the doc doesn't otherwise have one. We can use the rank to infer a possible score instead (1/rank).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
Expand Down Expand Up @@ -54,8 +55,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;

/**
* A response of a search request.
*/
Expand All @@ -71,6 +70,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
public static final ParseField TIMED_OUT = new ParseField("timed_out");
public static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
public static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
public static final ParseField PHASE_FAILURES = new ParseField("phase_failures");

private final SearchHits hits;
private final InternalAggregations aggregations;
Expand All @@ -85,6 +85,7 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
private final int successfulShards;
private final int skippedShards;
private final ShardSearchFailure[] shardFailures;
private final PhaseFailure[] phaseFailures;
private final Clusters clusters;
private final long tookInMillis;

Expand All @@ -109,14 +110,14 @@ public SearchResponse(StreamInput in) throws IOException {
this.numReducePhases = in.readVInt();
totalShards = in.readVInt();
successfulShards = in.readVInt();
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
shardFailures = in.readArray(
ShardSearchFailure::readShardSearchFailure,
s -> s == 0 ? ShardSearchFailure.EMPTY_ARRAY : new ShardSearchFailure[s]
);
if (in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
phaseFailures = in.readArray(PhaseFailure::new, s -> s == 0 ? PhaseFailure.EMPTY_ARRAY : new PhaseFailure[s]);
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
phaseFailures = PhaseFailure.EMPTY_ARRAY;
}
clusters = new Clusters(in);
scrollId = in.readOptionalString();
Expand All @@ -139,6 +140,7 @@ public SearchResponse(
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
PhaseFailure[] phaseFailures,
Clusters clusters
) {
this(
Expand All @@ -155,6 +157,7 @@ public SearchResponse(
skippedShards,
tookInMillis,
shardFailures,
phaseFailures,
clusters,
null
);
Expand All @@ -168,6 +171,7 @@ public SearchResponse(
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
PhaseFailure[] phaseFailures,
Clusters clusters,
BytesReference pointInTimeId
) {
Expand All @@ -185,6 +189,7 @@ public SearchResponse(
skippedShards,
tookInMillis,
shardFailures,
phaseFailures,
clusters,
pointInTimeId
);
Expand All @@ -204,6 +209,7 @@ public SearchResponse(
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
PhaseFailure[] phaseFailures,
Clusters clusters,
BytesReference pointInTimeId
) {
Expand All @@ -223,6 +229,7 @@ public SearchResponse(
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.shardFailures = shardFailures;
this.phaseFailures = phaseFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
Expand Down Expand Up @@ -348,7 +355,14 @@ public int getFailedShards() {
* The failures that occurred during the search.
*/
public ShardSearchFailure[] getShardFailures() {
return this.shardFailures;
return shardFailures;
}

/**
* The failures that occurred with specific phases that did not stop results from being returned.
*/
public PhaseFailure[] getPhaseFailures() {
return phaseFailures;
}

/**
Expand Down Expand Up @@ -402,6 +416,9 @@ private Iterator<ToXContent> getToXContentIterator(boolean wrapInObject, ToXCont
return Iterators.concat(
wrapInObject ? ChunkedToXContentHelper.startObject() : Collections.emptyIterator(),
ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent),
CollectionUtils.isEmpty(phaseFailures)
? Collections.emptyIterator()
: ChunkedToXContentHelper.array(PHASE_FAILURES.getPreferredName(), Iterators.forArray(phaseFailures)),
Iterators.single(clusters),
hits.toXContentChunked(params),
aggregations == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(aggregations),
Expand Down Expand Up @@ -453,10 +470,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numReducePhases);
out.writeVInt(totalShards);
out.writeVInt(successfulShards);

out.writeVInt(shardFailures.length);
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
out.writeArray(shardFailures);
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_SUBSIDIARY_FAILURES)) {
out.writeArray(phaseFailures);
}
clusters.writeTo(out);
out.writeOptionalString(scrollId);
Expand Down Expand Up @@ -1150,6 +1166,7 @@ public static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters
0,
tookInMillisSupplier.get(),
ShardSearchFailure.EMPTY_ARRAY,
PhaseFailure.EMPTY_ARRAY,
clusters,
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
// the current reduce phase counts as one
int numReducePhases = 1;
List<ShardSearchFailure> failures = new ArrayList<>();
List<PhaseFailure> phaseFailures = new ArrayList<>();
Map<String, SearchProfileShardResult> profileResults = new HashMap<>();
List<InternalAggregations> aggs = new ArrayList<>();
Map<ShardIdAndClusterAlias, Integer> shards = new TreeMap<>();
Expand All @@ -145,6 +146,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
numReducePhases += searchResponse.getNumReducePhases();

Collections.addAll(failures, searchResponse.getShardFailures());
Collections.addAll(phaseFailures, searchResponse.getPhaseFailures());

profileResults.putAll(searchResponse.getProfileResults());

Expand Down Expand Up @@ -213,6 +215,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
? InternalAggregations.EMPTY
: InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
PhaseFailure[] subFailures = phaseFailures.toArray(PhaseFailure.EMPTY_ARRAY);
SearchProfileResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileResults(profileResults);
// make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
Expand All @@ -231,6 +234,7 @@ public SearchResponse getMergedResponse(Clusters clusters) {
skippedShards,
tookInMillis,
shardFailures,
subFailures,
clusters,
null
);
Expand Down
Loading