Skip to content

Commit 5e1ff8e

Browse files
Remove redundant success counter from AbstractSearchAsyncAction
No need to count successes explicitly, the failure count and total count contain all the necessary information here (a fact we asserted also). This removal also revealed some dead-code in the phase transition method.
1 parent 481d91c commit 5e1ff8e

File tree

2 files changed

+13
-80
lines changed

2 files changed

+13
-80
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 13 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8686
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8787
private final Object shardFailuresMutex = new Object();
8888
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
89-
private final AtomicInteger successfulOps;
9089
private final SearchTimeProvider timeProvider;
9190
private final SearchResponse.Clusters clusters;
9291

@@ -135,7 +134,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
135134
this.skippedCount = skipped;
136135
this.shardsIts = iterators;
137136
outstandingShards = new AtomicInteger(iterators.size());
138-
successfulOps = new AtomicInteger(skipped);
139137
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
140138
// we later compute the shard index based on the natural order of the shards
141139
// that participate in the search request. This means that this number is
@@ -328,32 +326,16 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
328326
} else {
329327
Boolean allowPartialResults = request.allowPartialSearchResults();
330328
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
331-
if (allowPartialResults == false && successfulOps.get() != getNumShards()) {
329+
if (allowPartialResults == false && shardSearchFailures.length > 0) {
332330
// check if there are actual failures in the atomic array since
333331
// successful retries can reset the failures to null
334-
if (shardSearchFailures.length > 0) {
335-
if (logger.isDebugEnabled()) {
336-
int numShardFailures = shardSearchFailures.length;
337-
shardSearchFailures = ExceptionsHelper.groupBy(shardSearchFailures);
338-
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
339-
logger.debug(() -> format("%s shards failed for phase: [%s]", numShardFailures, currentPhase), cause);
340-
}
341-
onPhaseFailure(currentPhase, "Partial shards failure", null);
342-
} else {
343-
int discrepancy = getNumShards() - successfulOps.get();
344-
assert discrepancy > 0 : "discrepancy: " + discrepancy;
345-
if (logger.isDebugEnabled()) {
346-
logger.debug(
347-
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
348-
discrepancy,
349-
successfulOps.get(),
350-
skippedCount,
351-
getNumShards(),
352-
currentPhase
353-
);
354-
}
355-
onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null);
332+
if (logger.isDebugEnabled()) {
333+
int numShardFailures = shardSearchFailures.length;
334+
shardSearchFailures = ExceptionsHelper.groupBy(shardSearchFailures);
335+
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
336+
logger.debug(() -> format("%s shards failed for phase: [%s]", numShardFailures, currentPhase), cause);
356337
}
338+
onPhaseFailure(currentPhase, "Partial shards failure", null);
357339
return;
358340
}
359341
var nextPhase = nextPhaseSupplier.get();
@@ -466,19 +448,10 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
466448
}
467449
}
468450
ShardSearchFailure failure = shardFailures.get(shardIndex);
469-
if (failure == null) {
451+
// the failure is already present, try and not override it with an exception that is less meaningless
452+
// for example, getting illegal shard state
453+
if (failure == null || (TransportActions.isReadOverrideException(e) && e instanceof SearchContextMissingException == false)) {
470454
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
471-
} else {
472-
// the failure is already present, try and not override it with an exception that is less meaningless
473-
// for example, getting illegal shard state
474-
if (TransportActions.isReadOverrideException(e) && (e instanceof SearchContextMissingException == false)) {
475-
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
476-
}
477-
}
478-
479-
if (results.hasResult(shardIndex)) {
480-
assert failure == null : "shard failed before but shouldn't: " + failure;
481-
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
482455
}
483456
}
484457
}
@@ -502,23 +475,13 @@ protected void onShardResult(Result result) {
502475
}
503476

504477
private void onShardResultConsumed(Result result) {
505-
successfulOps.incrementAndGet();
506478
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
507479
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
508480
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
509481
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
510482
if (shardFailures != null) {
511483
shardFailures.set(result.getShardIndex(), null);
512484
}
513-
// we need to increment successful ops first before we compare the exit condition otherwise if we
514-
// are fast we could concurrently update totalOps but then preempt one of the threads which can
515-
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
516-
// increment all the "future" shards to update the total ops since we some may work and some may not...
517-
// and when that happens, we break on total ops, so we must maintain them
518-
successfulShardExecution();
519-
}
520-
521-
private void successfulShardExecution() {
522485
final int outstanding = outstandingShards.decrementAndGet();
523486
assert outstanding >= 0 : "outstanding: " + outstanding;
524487
if (outstanding == 0) {
@@ -580,15 +543,12 @@ private SearchResponse buildSearchResponse(
580543
String scrollId,
581544
BytesReference searchContextId
582545
) {
583-
int numSuccess = successfulOps.get();
584-
int numFailures = failures.length;
585-
assert numSuccess + numFailures == getNumShards()
586-
: "numSuccess(" + numSuccess + ") + numFailures(" + numFailures + ") != totalShards(" + getNumShards() + ")";
546+
final int numShards = getNumShards();
587547
return new SearchResponse(
588548
internalSearchResponse,
589549
scrollId,
590-
getNumShards(),
591-
numSuccess,
550+
numShards,
551+
numShards - failures.length,
592552
skippedCount,
593553
buildTookInMillis(),
594554
failures,

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -215,33 +215,6 @@ public void testOnPhaseFailure() {
215215
assertEquals(requestIds, releasedContexts);
216216
}
217217

218-
public void testShardNotAvailableWithDisallowPartialFailures() {
219-
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
220-
AtomicReference<Exception> exception = new AtomicReference<>();
221-
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
222-
int numShards = randomIntBetween(2, 10);
223-
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numShards);
224-
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
225-
// skip one to avoid the "all shards failed" failure.
226-
action.onShardResult(new SearchPhaseResult() {
227-
@Override
228-
public int getShardIndex() {
229-
return 0;
230-
}
231-
232-
@Override
233-
public SearchShardTarget getSearchShardTarget() {
234-
return new SearchShardTarget(null, null, null);
235-
}
236-
});
237-
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
238-
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
239-
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage());
240-
assertEquals("test", searchPhaseExecutionException.getPhaseName());
241-
assertEquals(0, searchPhaseExecutionException.shardFailures().length);
242-
assertEquals(0, searchPhaseExecutionException.getSuppressed().length);
243-
}
244-
245218
private static ArraySearchPhaseResults<SearchPhaseResult> phaseResults(
246219
Set<ShardSearchContextId> contextIds,
247220
List<Tuple<String, String>> nodeLookups,

0 commit comments

Comments
 (0)