Skip to content

Commit 72999c2

Browse files
committed
Moving phase failure deduplication flag to SearchQueryThenFetchAsyncAction
1 parent e897148 commit 72999c2

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9393
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9494
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9595
private final int skippedCount;
96-
private final AtomicBoolean phaseFailureEncountered = new AtomicBoolean();
9796

9897
// protected for tests
9998
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
@@ -622,10 +621,6 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
622621
* @param cause the cause of the phase failure
623622
*/
624623
public void onPhaseFailure(String phase, String msg, Throwable cause) {
625-
if (phaseFailureEncountered.compareAndSet(false, true) == false) {
626-
// we already encountered a phase failure, so we ignore this one
627-
return;
628-
}
629624
raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures()));
630625
}
631626

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
462462
}
463463
}
464464
}
465+
final AtomicBoolean phaseFailureEncountered = new AtomicBoolean(false);
465466
perNodeQueries.forEach((routing, request) -> {
466467
if (request.shards.size() == 1) {
467468
executeAsSingleRequest(routing, request.shards.getFirst());
@@ -483,6 +484,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
483484
}
484485
searchTransportService.transportService()
485486
.sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler<NodeQueryResponse>() {
487+
486488
@Override
487489
public NodeQueryResponse read(StreamInput in) throws IOException {
488490
return new NodeQueryResponse(in);
@@ -533,16 +535,21 @@ public void handleException(TransportException e) {
533535
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
534536
queryPhaseResultConsumer.failure.compareAndSet(null, cause);
535537
}
536-
logger.debug("Raising phase failure for " + cause + " while executing search on node " + routing.nodeId());
537-
onPhaseFailure(getName(), "", cause);
538+
if (phaseFailureEncountered.compareAndSet(false, true)) {
539+
logger.debug("Raising phase failure for " + cause + " while executing search on node " + routing.nodeId());
540+
onPhaseFailure(getName(), "", cause);
541+
} else {
542+
// we already failed the phase, ignore any additional failures and just log them if debug enabled
543+
logger.debug("Ignoring additional phase failure for " + cause + " from search on node " + routing.nodeId());
544+
}
538545
}
539546
}
540547
});
541548
});
542549
}
543550

544551
private void executeWithoutBatching(CanMatchPreFilterSearchPhase.SendingTarget targetNode, NodeQueryRequest request) {
545-
for (ShardToQuery shard : request.shards) {
552+
for (SearchQueryThenFetchAsyncAction.ShardToQuery shard : request.shards) {
546553
executeAsSingleRequest(targetNode, shard);
547554
}
548555
}

0 commit comments

Comments
 (0)