From 27f270a8bb374b9ac5f643aa68caf8901baf0ec6 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Apr 2025 11:30:51 +0200 Subject: [PATCH 1/4] Cleanup needlessly holding Exceptions in the can_match phase Exceptions can be quite expensive heap-wise and we don't use them here any longer, not even for logging -> replace atomic array with a set of shard indices and dry up the logic a tiny bit as a result as well. --- .../search/CanMatchPreFilterSearchPhase.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index e42f8127c5e97..326bf68431736 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -38,8 +38,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.BiFunction; import static org.elasticsearch.core.Strings.format; @@ -277,12 +278,11 @@ private Map> groupByNode(List shards; private final CountDown countDown; - private final AtomicReferenceArray failedResponses; + private final Set failedResponses = ConcurrentHashMap.newKeySet(); Round(List shards) { this.shards = shards; this.countDown = new CountDown(shards.size()); - this.failedResponses = new AtomicReferenceArray<>(shardsIts.size()); } @Override @@ -296,9 +296,7 @@ protected void doRun() { if (entry.getKey().nodeId == null) { // no target node: just mark the requests as failed - for (CanMatchNodeRequest.Shard shard : shardLevelRequests) { - onOperationFailed(shard.getShardRequestIndex(), null); - } + onAllFailed(shardLevelRequests); continue; } @@ -321,37 +319,39 @@ public void onResponse(CanMatchNodeResponse canMatchNodeResponse) { } else { Exception failure = response.getException(); assert failure != null; - onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex(), failure); + onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex()); } } } @Override public void onFailure(Exception e) { - for (CanMatchNodeRequest.Shard shard : shardLevelRequests) { - onOperationFailed(shard.getShardRequestIndex(), e); - } + onAllFailed(shardLevelRequests); } } ); } catch (Exception e) { - for (CanMatchNodeRequest.Shard shard : shardLevelRequests) { - onOperationFailed(shard.getShardRequestIndex(), e); - } + onAllFailed(shardLevelRequests); } } } + private void onAllFailed(List shardLevelRequests) { + for (CanMatchNodeRequest.Shard shard : shardLevelRequests) { + onOperationFailed(shard.getShardRequestIndex()); + } + } + private void onOperation(int idx, CanMatchShardResponse response) { - failedResponses.set(idx, null); + failedResponses.add(idx); consumeResult(response); if (countDown.countDown()) { finishRound(); } } - private void onOperationFailed(int idx, Exception e) { - failedResponses.set(idx, e); + private void onOperationFailed(int idx) { + failedResponses.add(idx); // we have to carry over shard failures in order to account for them in the response. consumeResult(idx, true, null); if (countDown.countDown()) { @@ -363,8 +363,7 @@ private void finishRound() { List remainingShards = new ArrayList<>(); for (SearchShardIterator ssi : shards) { int shardIndex = shardItIndexMap.get(ssi); - Exception failedResponse = failedResponses.get(shardIndex); - if (failedResponse != null) { + if (failedResponses.contains(shardIndex)) { remainingShards.add(ssi); } } From 99a702cf112c3955b4caf29adb93b9c05b43da4e Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Apr 2025 11:40:56 +0200 Subject: [PATCH 2/4] facepalm --- .../action/search/CanMatchPreFilterSearchPhase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 326bf68431736..6412325ed2520 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -343,7 +343,7 @@ private void onAllFailed(List shardLevelRequests) { } private void onOperation(int idx, CanMatchShardResponse response) { - failedResponses.add(idx); + failedResponses.remove(idx); consumeResult(response); if (countDown.countDown()) { finishRound(); From 3b68ca879476b99d04c90ea8b4cd69631b0c681a Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Apr 2025 12:34:27 +0200 Subject: [PATCH 3/4] facepalm --- .../search/CanMatchPreFilterSearchPhase.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 6412325ed2520..0f2d251ec8234 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -229,14 +229,14 @@ private void runCoordinatorRewritePhase() { private void consumeResult(boolean canMatch, ShardSearchRequest request) { CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null); result.setShardIndex(request.shardRequestIndex()); - consumeResult(result); + consumeResult(request.shardRequestIndex(), result); } - private void consumeResult(CanMatchShardResponse result) { + private void consumeResult(int index, CanMatchShardResponse result) { final boolean canMatch = result.canMatch(); final MinAndMax minAndMax = result.estimatedMinAndMax(); if (canMatch || minAndMax != null) { - consumeResult(result.getShardIndex(), canMatch, minAndMax); + consumeResult(index, canMatch, minAndMax); } } @@ -295,8 +295,11 @@ protected void doRun() { List shardLevelRequests = canMatchNodeRequest.getShardLevelRequests(); if (entry.getKey().nodeId == null) { - // no target node: just mark the requests as failed - onAllFailed(shardLevelRequests); + // no target node: just mark as matching and have the next phase fail the shard operation if needed + for (CanMatchNodeRequest.Shard shard : shardLevelRequests) { + var resp = new CanMatchShardResponse(true, null); + onOperation(shard.getShardRequestIndex(), resp); + } continue; } @@ -344,7 +347,7 @@ private void onAllFailed(List shardLevelRequests) { private void onOperation(int idx, CanMatchShardResponse response) { failedResponses.remove(idx); - consumeResult(response); + consumeResult(idx, response); if (countDown.countDown()) { finishRound(); } From ec3a8be5cace242dad03d083683a16750b207284 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Apr 2025 12:37:15 +0200 Subject: [PATCH 4/4] facepalm --- .../search/CanMatchPreFilterSearchPhase.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 0f2d251ec8234..24aabc59d84a6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -227,25 +227,17 @@ private void runCoordinatorRewritePhase() { } private void consumeResult(boolean canMatch, ShardSearchRequest request) { - CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null); - result.setShardIndex(request.shardRequestIndex()); - consumeResult(request.shardRequestIndex(), result); + consumeResult(request.shardRequestIndex(), canMatch, null); } - private void consumeResult(int index, CanMatchShardResponse result) { - final boolean canMatch = result.canMatch(); - final MinAndMax minAndMax = result.estimatedMinAndMax(); - if (canMatch || minAndMax != null) { - consumeResult(index, canMatch, minAndMax); - } - } - - private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax minAndMax) { + private void consumeResult(int shardIndex, boolean canMatch, MinAndMax minAndMax) { if (canMatch) { - possibleMatches.set(shardIndex); - numPossibleMatches++; + synchronized (this) { + possibleMatches.set(shardIndex); + numPossibleMatches++; + minAndMaxes[shardIndex] = minAndMax; + } } - minAndMaxes[shardIndex] = minAndMax; } private void checkNoMissingShards(List shards) { @@ -347,7 +339,7 @@ private void onAllFailed(List shardLevelRequests) { private void onOperation(int idx, CanMatchShardResponse response) { failedResponses.remove(idx); - consumeResult(idx, response); + consumeResult(idx, response.canMatch(), response.estimatedMinAndMax()); if (countDown.countDown()) { finishRound(); }