Skip to content

Commit 01c09cb

Browse files
authored
Simplify expectedFinalRegisterValue computation (#131274)
In repository analysis we keep an `AtomicLong` counting the number of successfully-completed increments on the contended register, but we only check this value if all the increments succeed. We know how many increments we enqueue up-front so there's no need to count them as they complete. This commit removes the unnecessary counter.
1 parent f8b89a3 commit 01c09cb

File tree

1 file changed

+4
-10
lines changed

1 file changed

+4
-10
lines changed

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import java.util.Set;
7878
import java.util.concurrent.Semaphore;
7979
import java.util.concurrent.atomic.AtomicBoolean;
80-
import java.util.concurrent.atomic.AtomicLong;
8180
import java.util.concurrent.atomic.AtomicReference;
8281
import java.util.function.Consumer;
8382
import java.util.function.LongSupplier;
@@ -380,7 +379,6 @@ public static class AsyncAction {
380379
// choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
381380
private final String blobPath = "temp-analysis-" + UUIDs.randomBase64UUID();
382381

383-
private final AtomicLong expectedRegisterValue = new AtomicLong();
384382
private final Queue<Consumer<Releasable>> queue = ConcurrentCollections.newQueue();
385383
private final AtomicReference<Exception> failure = new AtomicReference<>();
386384
private final Semaphore innerFailures = new Semaphore(5); // limit the number of suppressed failures
@@ -486,16 +484,17 @@ public void run() {
486484
if (minClusterTransportVersion.onOrAfter(TransportVersions.V_8_8_0)) {
487485
final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
488486
final AtomicBoolean contendedRegisterAnalysisComplete = new AtomicBoolean();
487+
final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
489488
try (
490489
var registerRefs = new RefCountingRunnable(
491490
finalRegisterValueVerifier(
492491
contendedRegisterName,
492+
registerOperations,
493493
random,
494494
Releasables.wrap(requestRefs.acquire(), () -> contendedRegisterAnalysisComplete.set(true))
495495
)
496496
)
497497
) {
498-
final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
499498
for (int i = 0; i < registerOperations; i++) {
500499
final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
501500
request.getRepositoryName(),
@@ -631,9 +630,7 @@ private void runContendedRegisterAnalysis(Releasable ref, ContendedRegisterAnaly
631630
TransportRequestOptions.EMPTY,
632631
new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
633632
@Override
634-
public void onResponse(ActionResponse.Empty response) {
635-
expectedRegisterValue.incrementAndGet();
636-
}
633+
public void onResponse(ActionResponse.Empty response) {}
637634

638635
@Override
639636
public void onFailure(Exception exp) {
@@ -647,7 +644,7 @@ public void onFailure(Exception exp) {
647644
}
648645
}
649646

650-
private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
647+
private Runnable finalRegisterValueVerifier(String registerName, int expectedFinalRegisterValue, Random random, Releasable ref) {
651648
return new Runnable() {
652649

653650
final CheckedConsumer<ActionListener<OptionalBytesReference>, Exception> finalValueReader = switch (random.nextInt(3)) {
@@ -706,12 +703,9 @@ public String toString() {
706703
}
707704
};
708705

709-
long expectedFinalRegisterValue = Long.MIN_VALUE;
710-
711706
@Override
712707
public void run() {
713708
if (isRunning()) {
714-
expectedFinalRegisterValue = expectedRegisterValue.get();
715709
transportService.getThreadPool()
716710
.executor(ThreadPool.Names.SNAPSHOT)
717711
.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {

0 commit comments

Comments
 (0)