From e89714815bc6efb94c3ecb1278e38faa735757e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 11 Jul 2025 12:46:56 +0200 Subject: [PATCH 1/2] Fix assertion error search reduce errors When receiving exceptions from more than one node on data node search reduction, SearchQueryThenFetchAsyncAction will raise more than one phase failure. This can lead to calling the listener in AbstractSearchAsyncAction more than once, which in turn in tests trips an assertion in ActionListener#assertFirstRun. --- .../search/sort/FieldSortIT.java | 25 +++++++++++++++++++ .../search/AbstractSearchAsyncAction.java | 5 ++++ .../SearchQueryThenFetchAsyncAction.java | 1 + 3 files changed, 31 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/sort/FieldSortIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/sort/FieldSortIT.java index 97da362eebe82..ad44f19a14079 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/sort/FieldSortIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/sort/FieldSortIT.java @@ -2160,6 +2160,31 @@ public void testLongSortOptimizationCorrectResults() { ); } + public void testSortMixedFieldTypesSeveralDocs() { + assertAcked( + prepareCreate("index_long").setMapping("foo", "type=long"), + prepareCreate("index_double").setMapping("foo", "type=double") + ); + + List builders = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + builders.add(prepareIndex("index_long").setId(String.valueOf(i)).setSource("foo", i)); + builders.add(prepareIndex("index_double").setId(String.valueOf(i)).setSource("foo", i)); + } + indexRandom(true, false, builders); + + String errMsg = "Can't sort on field [foo]; the field has incompatible sort types"; + + { // mixing long and double types is not allowed + SearchPhaseExecutionException exc = expectThrows( + SearchPhaseExecutionException.class, + prepareSearch("index_long", "index_double").addSort(new FieldSortBuilder("foo")).setSize(20) + ); + assertThat(exc.getCause().toString(), containsString(errMsg)); + } + } + public void testSortMixedFieldTypes() throws IOException { assertAcked( prepareCreate("index_long").setMapping("foo", "type=long"), diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..57e18bd867793 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -93,6 +93,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + private final AtomicBoolean phaseFailureEncountered = new AtomicBoolean(); // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -621,6 +622,10 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { * @param cause the cause of the phase failure */ public void onPhaseFailure(String phase, String msg, Throwable cause) { + if (phaseFailureEncountered.compareAndSet(false, true) == false) { + // we already encountered a phase failure, so we ignore this one + return; + } raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a8f22eb1cc572..c5df961c2b49a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -533,6 +533,7 @@ public void handleException(TransportException e) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { queryPhaseResultConsumer.failure.compareAndSet(null, cause); } + logger.debug("Raising phase failure for " + cause + " while executing search on node " + routing.nodeId()); onPhaseFailure(getName(), "", cause); } } From 72999c2f02e22b3fe4fabed292fcad63b1982ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 26 Sep 2025 12:41:19 +0200 Subject: [PATCH 2/2] Moving phase failure deduplication flag to SearchQueryThenFetchAsyncAction --- .../action/search/AbstractSearchAsyncAction.java | 5 ----- .../search/SearchQueryThenFetchAsyncAction.java | 13 ++++++++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 57e18bd867793..e058a3c83d41c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -93,7 +93,6 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; - private final AtomicBoolean phaseFailureEncountered = new AtomicBoolean(); // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -622,10 +621,6 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { * @param cause the cause of the phase failure */ public void onPhaseFailure(String phase, String msg, Throwable cause) { - if (phaseFailureEncountered.compareAndSet(false, true) == false) { - // we already encountered a phase failure, so we ignore this one - return; - } raisePhaseFailure(new SearchPhaseExecutionException(phase, msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index c5df961c2b49a..75a63acd3cfd1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -462,6 +462,7 @@ protected void doRun(Map shardIndexMap) { } } } + final AtomicBoolean phaseFailureEncountered = new AtomicBoolean(false); perNodeQueries.forEach((routing, request) -> { if (request.shards.size() == 1) { executeAsSingleRequest(routing, request.shards.getFirst()); @@ -483,6 +484,7 @@ protected void doRun(Map shardIndexMap) { } searchTransportService.transportService() .sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler() { + @Override public NodeQueryResponse read(StreamInput in) throws IOException { return new NodeQueryResponse(in); @@ -533,8 +535,13 @@ public void handleException(TransportException e) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { queryPhaseResultConsumer.failure.compareAndSet(null, cause); } - logger.debug("Raising phase failure for " + cause + " while executing search on node " + routing.nodeId()); - onPhaseFailure(getName(), "", cause); + if (phaseFailureEncountered.compareAndSet(false, true)) { + logger.debug("Raising phase failure for " + cause + " while executing search on node " + routing.nodeId()); + onPhaseFailure(getName(), "", cause); + } else { + // we already failed the phase, ignore any additional failures and just log them if debug enabled + logger.debug("Ignoring additional phase failure for " + cause + " from search on node " + routing.nodeId()); + } } } }); @@ -542,7 +549,7 @@ public void handleException(TransportException e) { } private void executeWithoutBatching(CanMatchPreFilterSearchPhase.SendingTarget targetNode, NodeQueryRequest request) { - for (ShardToQuery shard : request.shards) { + for (SearchQueryThenFetchAsyncAction.ShardToQuery shard : request.shards) { executeAsSingleRequest(targetNode, shard); } }