diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 31d96d29c5e5f..e058a3c83d41c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -13,7 +13,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -79,11 +78,9 @@ abstract class AbstractSearchAsyncAction exten protected final SearchTask task; protected final SearchPhaseResults results; private final long clusterStateVersion; - private final TransportVersion minTransportVersion; protected final Map aliasFilter; protected final Map concreteIndexBoosts; private final SetOnce> shardFailures = new SetOnce<>(); - private final Object shardFailuresMutex = new Object(); private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); private final AtomicInteger successfulOps; protected final SearchTimeProvider timeProvider; @@ -93,8 +90,7 @@ abstract class AbstractSearchAsyncAction exten protected final SearchShardIterator[] shardIterators; private final AtomicInteger outstandingShards; private final int maxConcurrentRequestsPerNode; - private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); - private final boolean throttleConcurrentRequests; + private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; @@ -142,7 +138,7 @@ abstract class AbstractSearchAsyncAction exten Arrays.sort(shardIterators); this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode; // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle - this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size(); + this.pendingExecutionsPerNode = maxConcurrentRequestsPerNode < shardsIts.size() ? new ConcurrentHashMap<>() : null; this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; @@ -153,7 +149,6 @@ abstract class AbstractSearchAsyncAction exten this.nodeIdToConnection = nodeIdToConnection; this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); - this.minTransportVersion = clusterState.getMinTransportVersion(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -254,7 +249,8 @@ protected void doRun(Map shardIndexMap) { } protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { - if (throttleConcurrentRequests) { + var pendingExecutionsPerNode = this.pendingExecutionsPerNode; + if (pendingExecutionsPerNode != null) { var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent( shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode) @@ -464,7 +460,7 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti AtomicArray shardFailures = this.shardFailures.get(); // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally - synchronized (shardFailuresMutex) { + synchronized (this.shardFailures) { shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it? if (shardFailures == null) { // still null so we are the first and create a new instance shardFailures = new AtomicArray<>(getNumShards()); @@ -585,10 +581,6 @@ private SearchResponse buildSearchResponse( ); } - boolean buildPointInTimeFromSearchResults() { - return false; - } - /** * Builds and sends the final search response back to the user. * @@ -602,23 +594,25 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At if (allowPartialResults == false && failures.length > 0) { raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures)); } else { - final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null; - final BytesReference searchContextId; - if (buildPointInTimeFromSearchResults()) { - searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures); - } else { - if (request.source() != null - && request.source().pointInTimeBuilder() != null - && request.source().pointInTimeBuilder().singleSession() == false) { - searchContextId = request.source().pointInTimeBuilder().getEncodedId(); - } else { - searchContextId = null; - } - } - ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId)); + ActionListener.respondAndRelease( + listener, + buildSearchResponse( + internalSearchResponse, + failures, + request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null, + buildSearchContextId(failures) + ) + ); } } + protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { + var source = request.source(); + return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false + ? source.pointInTimeBuilder().getEncodedId() + : null; + } + /** * This method will communicate a fatal phase failure back to the user. In contrast to a shard failure * will this method immediately fail the search request and return the failure to the issuer of the request diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 357c65f2a54c6..8b08a6b18bd79 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -24,6 +25,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -217,6 +219,7 @@ void runOpenPointInTimePhase( ) { assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests() : searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests(); + TransportVersion minTransportVersion = clusterState.getMinTransportVersion(); new AbstractSearchAsyncAction<>( actionName, logger, @@ -266,8 +269,8 @@ protected void run() { } @Override - boolean buildPointInTimeFromSearchResults() { - return true; + protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { + return SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, minTransportVersion, failures); } }.start(); }