-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Simplify counting in AbstractSearchAsyncAction #120593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
bd5c03b
eaa2b7a
93eaa31
2437ed2
832f189
60067bf
491571b
725df16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,8 @@ | |
| import org.elasticsearch.tasks.TaskCancelledException; | ||
| import org.elasticsearch.transport.Transport; | ||
|
|
||
| import java.lang.invoke.MethodHandles; | ||
| import java.lang.invoke.VarHandle; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
|
|
@@ -90,15 +92,25 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten | |
| private final Object shardFailuresMutex = new Object(); | ||
| private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); | ||
| private final AtomicInteger successfulOps = new AtomicInteger(); | ||
| private final AtomicInteger skippedOps = new AtomicInteger(); | ||
| private final SearchTimeProvider timeProvider; | ||
| private final SearchResponse.Clusters clusters; | ||
|
|
||
| protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts; | ||
| protected final GroupShardsIterator<SearchShardIterator> shardsIts; | ||
| private final SearchShardIterator[] shardIterators; | ||
| private final int expectedTotalOps; | ||
| private final AtomicInteger totalOps = new AtomicInteger(); | ||
|
|
||
| private static final VarHandle OUTSTANDING_SHARDS; | ||
|
|
||
| static { | ||
| try { | ||
| OUTSTANDING_SHARDS = MethodHandles.lookup().findVarHandle(AbstractSearchAsyncAction.class, "outstandingShards", int.class); | ||
| } catch (Exception e) { | ||
| throw new ExceptionInInitializerError(e); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unused") // only accessed via #OUTSTANDING_SHARDS | ||
| private int outstandingShards; | ||
| private final int maxConcurrentRequestsPerNode; | ||
| private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>(); | ||
| private final boolean throttleConcurrentRequests; | ||
|
|
@@ -139,18 +151,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten | |
| } | ||
| this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); | ||
| this.shardsIts = new GroupShardsIterator<>(iterators); | ||
|
|
||
| OUTSTANDING_SHARDS.setRelease(this, shardsIts.size()); | ||
| this.shardIterators = iterators.toArray(new SearchShardIterator[0]); | ||
| // we later compute the shard index based on the natural order of the shards | ||
| // that participate in the search request. This means that this number is | ||
| // consistent between two requests that target the same shards. | ||
| Arrays.sort(shardIterators); | ||
|
|
||
| // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up | ||
| // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. | ||
| // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result | ||
| // we process hence we add one for the non active partition here. | ||
| this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For posterity, this was introduced with faefc77 . The intent was to account for inactive shards (that are in the process of being allocated) as 1, so that they are expected to fail, otherwise their failure would make the search complete earlier than expected, missing results from other active shards. Together with it In principle, I agree that counting each shard as 1, regardless of how many copies it has whether that be inactive, primary only, one replica or multiple replicas is simpler. |
||
| this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode; | ||
| // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle | ||
| this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size(); | ||
|
|
@@ -251,9 +257,8 @@ protected final void run() { | |
|
|
||
| void skipShard(SearchShardIterator iterator) { | ||
| successfulOps.incrementAndGet(); | ||
| skippedOps.incrementAndGet(); | ||
| assert iterator.skip(); | ||
| successfulShardExecution(iterator); | ||
| successfulShardExecution(); | ||
| } | ||
|
|
||
| private static boolean assertExecuteOnStartThread() { | ||
|
|
@@ -380,7 +385,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP | |
| "Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})", | ||
| discrepancy, | ||
| successfulOps.get(), | ||
| skippedOps.get(), | ||
| toSkipShardsIts.size(), | ||
| getNumShards(), | ||
| currentPhase | ||
| ); | ||
|
|
@@ -449,17 +454,11 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final | |
| } | ||
| onShardGroupFailure(shardIndex, shard, e); | ||
| } | ||
| final int totalOps = this.totalOps.incrementAndGet(); | ||
| if (totalOps == expectedTotalOps) { | ||
| onPhaseDone(); | ||
| } else if (totalOps > expectedTotalOps) { | ||
| throw new AssertionError( | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "unexpected higher total ops [" + totalOps + "] compared to expected [" + expectedTotalOps + "]", | ||
| new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()) | ||
| ); | ||
| if (lastShard == false) { | ||
| performPhaseOnShard(shardIndex, shardIt, nextShard); | ||
| } else { | ||
| if (lastShard == false) { | ||
| performPhaseOnShard(shardIndex, shardIt, nextShard); | ||
| if ((int) OUTSTANDING_SHARDS.getAndAdd(this, -1) == 1) { | ||
| onPhaseDone(); | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
@@ -535,10 +534,10 @@ protected void onShardResult(Result result, SearchShardIterator shardIt) { | |
| if (logger.isTraceEnabled()) { | ||
| logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); | ||
| } | ||
| results.consumeResult(result, () -> onShardResultConsumed(result, shardIt)); | ||
| results.consumeResult(result, () -> onShardResultConsumed(result)); | ||
| } | ||
|
|
||
| private void onShardResultConsumed(Result result, SearchShardIterator shardIt) { | ||
| private void onShardResultConsumed(Result result) { | ||
| successfulOps.incrementAndGet(); | ||
| // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level | ||
| // so its ok concurrency wise to miss potentially the shard failures being created because of another failure | ||
|
|
@@ -552,28 +551,12 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) { | |
| // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. | ||
| // increment all the "future" shards to update the total ops since we some may work and some may not... | ||
| // and when that happens, we break on total ops, so we must maintain them | ||
| successfulShardExecution(shardIt); | ||
| successfulShardExecution(); | ||
| } | ||
|
|
||
| private void successfulShardExecution(SearchShardIterator shardsIt) { | ||
| final int remainingOpsOnIterator; | ||
| if (shardsIt.skip()) { | ||
| // It's possible that we're skipping a shard that's unavailable | ||
| // but its range was available in the IndexMetadata, in that | ||
| // case the shardsIt.remaining() would be 0, expectedTotalOps | ||
| // accounts for unavailable shards too. | ||
| remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. definitely good to get rid of this special case. |
||
| } else { | ||
| remainingOpsOnIterator = shardsIt.remaining() + 1; | ||
| } | ||
| final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator); | ||
| if (xTotalOps == expectedTotalOps) { | ||
| private void successfulShardExecution() { | ||
| if ((int) OUTSTANDING_SHARDS.getAndAdd(this, -1) == 1) { | ||
| onPhaseDone(); | ||
| } else if (xTotalOps > expectedTotalOps) { | ||
| throw new AssertionError( | ||
| "unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]", | ||
| new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()) | ||
| ); | ||
javanna marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -640,7 +623,7 @@ private SearchResponse buildSearchResponse( | |
| scrollId, | ||
| getNumShards(), | ||
| numSuccess, | ||
| skippedOps.get(), | ||
| toSkipShardsIts.size(), | ||
| buildTookInMillis(), | ||
| failures, | ||
| clusters, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.