1313import org .apache .lucene .util .SetOnce ;
1414import org .elasticsearch .ElasticsearchException ;
1515import org .elasticsearch .ExceptionsHelper ;
16- import org .elasticsearch .TransportVersion ;
1716import org .elasticsearch .action .ActionListener ;
1817import org .elasticsearch .action .NoShardAvailableActionException ;
1918import org .elasticsearch .action .OriginalIndices ;
@@ -79,11 +78,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7978 protected final SearchTask task ;
8079 protected final SearchPhaseResults <Result > results ;
8180 private final long clusterStateVersion ;
82- private final TransportVersion minTransportVersion ;
8381 protected final Map <String , AliasFilter > aliasFilter ;
8482 protected final Map <String , Float > concreteIndexBoosts ;
8583 private final SetOnce <AtomicArray <ShardSearchFailure >> shardFailures = new SetOnce <>();
86- private final Object shardFailuresMutex = new Object ();
8784 private final AtomicBoolean hasShardResponse = new AtomicBoolean (false );
8885 private final AtomicInteger successfulOps ;
8986 protected final SearchTimeProvider timeProvider ;
@@ -93,8 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9390 protected final SearchShardIterator [] shardIterators ;
9491 private final AtomicInteger outstandingShards ;
9592 private final int maxConcurrentRequestsPerNode ;
96- private final Map <String , PendingExecutions > pendingExecutionsPerNode = new ConcurrentHashMap <>();
97- private final boolean throttleConcurrentRequests ;
93+ private final Map <String , PendingExecutions > pendingExecutionsPerNode ;
9894 private final AtomicBoolean requestCancelled = new AtomicBoolean ();
9995 private final int skippedCount ;
10096
@@ -142,7 +138,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
142138 Arrays .sort (shardIterators );
143139 this .maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode ;
144140 // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
145- this .throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts .size ();
141+ this .pendingExecutionsPerNode = maxConcurrentRequestsPerNode < shardsIts .size () ? new ConcurrentHashMap <>() : null ;
146142 this .timeProvider = timeProvider ;
147143 this .logger = logger ;
148144 this .searchTransportService = searchTransportService ;
@@ -153,7 +149,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
153149 this .nodeIdToConnection = nodeIdToConnection ;
154150 this .concreteIndexBoosts = concreteIndexBoosts ;
155151 this .clusterStateVersion = clusterState .version ();
156- this .minTransportVersion = clusterState .getMinTransportVersion ();
157152 this .aliasFilter = aliasFilter ;
158153 this .results = resultConsumer ;
159154 // register the release of the query consumer to free up the circuit breaker memory
@@ -254,7 +249,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
254249 }
255250
256251 protected final void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
257- if (throttleConcurrentRequests ) {
252+ var pendingExecutionsPerNode = this .pendingExecutionsPerNode ;
253+ if (pendingExecutionsPerNode != null ) {
258254 var pendingExecutions = pendingExecutionsPerNode .computeIfAbsent (
259255 shard .getNodeId (),
260256 n -> new PendingExecutions (maxConcurrentRequestsPerNode )
@@ -464,7 +460,7 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
464460 AtomicArray <ShardSearchFailure > shardFailures = this .shardFailures .get ();
465461 // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
466462 if (shardFailures == null ) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
467- synchronized (shardFailuresMutex ) {
463+ synchronized (this . shardFailures ) {
468464 shardFailures = this .shardFailures .get (); // read again otherwise somebody else has created it?
469465 if (shardFailures == null ) { // still null so we are the first and create a new instance
470466 shardFailures = new AtomicArray <>(getNumShards ());
@@ -585,10 +581,6 @@ private SearchResponse buildSearchResponse(
585581 );
586582 }
587583
588- boolean buildPointInTimeFromSearchResults () {
589- return false ;
590- }
591-
592584 /**
593585 * Builds and sends the final search response back to the user.
594586 *
@@ -602,23 +594,25 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
602594 if (allowPartialResults == false && failures .length > 0 ) {
603595 raisePhaseFailure (new SearchPhaseExecutionException ("" , "Shard failures" , null , failures ));
604596 } else {
605- final String scrollId = request .scroll () != null ? TransportSearchHelper .buildScrollId (queryResults ) : null ;
606- final BytesReference searchContextId ;
607- if (buildPointInTimeFromSearchResults ()) {
608- searchContextId = SearchContextId .encode (queryResults .asList (), aliasFilter , minTransportVersion , failures );
609- } else {
610- if (request .source () != null
611- && request .source ().pointInTimeBuilder () != null
612- && request .source ().pointInTimeBuilder ().singleSession () == false ) {
613- searchContextId = request .source ().pointInTimeBuilder ().getEncodedId ();
614- } else {
615- searchContextId = null ;
616- }
617- }
618- ActionListener .respondAndRelease (listener , buildSearchResponse (internalSearchResponse , failures , scrollId , searchContextId ));
597+ ActionListener .respondAndRelease (
598+ listener ,
599+ buildSearchResponse (
600+ internalSearchResponse ,
601+ failures ,
602+ request .scroll () != null ? TransportSearchHelper .buildScrollId (queryResults ) : null ,
603+ buildSearchContextId (failures )
604+ )
605+ );
619606 }
620607 }
621608
609+ protected BytesReference buildSearchContextId (ShardSearchFailure [] failures ) {
610+ var source = request .source ();
611+ return source != null && source .pointInTimeBuilder () != null && source .pointInTimeBuilder ().singleSession () == false
612+ ? source .pointInTimeBuilder ().getEncodedId ()
613+ : null ;
614+ }
615+
622616 /**
623617 * This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
624618 * will this method immediately fail the search request and return the failure to the issuer of the request
0 commit comments