Skip to content

Commit 6f55796

Browse files
authored
Improve lost-increment message in repo analysis (#131200)
Today repository analysis may fail with a message like the following: [test-repo] register [test-register-contended-F_NNXHrSSDGveoeyj1skwg] should have value [10] but instead had value [OptionalBytesReference[00 00 00 00 00 00 00 09]] This is confusing because one might interpret `should have value [10]` as an indication that Elasticsearch definitely wrote this value to the register, leaving you trying to work out how that particular write was lost. In fact it can be more subtle than that, we only believe the register blob should have this value because we know we completed 10 supposedly-atomic increment operations, and the failure could instead be that these operations are not as atomic as they need to be and that one or more of the increments was lost. This commit makes the message more verbose, clarifying that this failure could be an atomicity problem rather than a simple lost write: [test-repo] Successfully completed all [10] atomic increments of register [test-register-contended-F_NNXHrSSDGveoeyj1skwg] so its expected value is [OptionalBytesReference[00 00 00 00 00 00 00 0a]], but reading its value with [getRegister] unexpectedly yielded [OptionalBytesReference[00 00 00 00 00 00 00 09]]. This anomaly may indicate an atomicity failure amongst concurrent compare-and-exchange operations on registers in this repository.
1 parent 70176c8 commit 6f55796

File tree

3 files changed

+155
-56
lines changed

3 files changed

+155
-56
lines changed

docs/changelog/131200.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131200
2+
summary: Improve lost-increment message in repo analysis
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues: []

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.hamcrest.Matchers.containsString;
7777
import static org.hamcrest.Matchers.equalTo;
7878
import static org.hamcrest.Matchers.instanceOf;
79+
import static org.hamcrest.Matchers.matchesPattern;
7980
import static org.hamcrest.Matchers.nullValue;
8081

8182
public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
@@ -385,6 +386,55 @@ public BytesReference onContendedCompareAndExchange(BytesRegister register, Byte
385386
assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage());
386387
}
387388

389+
public void testFailsOnLostIncrement() {
390+
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
391+
final AtomicBoolean registerWasCorrupted = new AtomicBoolean();
392+
393+
blobStore.setDisruption(new Disruption() {
394+
@Override
395+
public BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
396+
if (expected.equals(updated) == false // not the initial read
397+
&& updated.length() == Long.BYTES // not the final write
398+
&& randomBoolean()
399+
&& register.get().equals(expected) // would have succeeded
400+
&& registerWasCorrupted.compareAndSet(false, true)) {
401+
402+
// indicate success without actually applying the update
403+
return expected;
404+
}
405+
406+
return register.compareAndExchange(expected, updated);
407+
}
408+
});
409+
410+
safeAwait((ActionListener<RepositoryAnalyzeAction.Response> l) -> analyseRepository(request, l.delegateResponse((ll, e) -> {
411+
if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) {
412+
assertAnalysisFailureMessage(repositoryVerificationException.getMessage());
413+
assertTrue(
414+
"did not lose increment, so why did the verification fail?",
415+
// clear flag for final assertion
416+
registerWasCorrupted.compareAndSet(true, false)
417+
);
418+
assertThat(
419+
asInstanceOf(
420+
RepositoryVerificationException.class,
421+
ExceptionsHelper.unwrapCause(repositoryVerificationException.getCause())
422+
).getMessage(),
423+
matchesPattern("""
424+
\\[test-repo] Successfully completed all \\[.*] atomic increments of register \\[test-register-contended-.*] \
425+
so its expected value is \\[OptionalBytesReference\\[.*]], but reading its value with \\[.*] unexpectedly \
426+
yielded \\[OptionalBytesReference\\[.*]]\\. This anomaly may indicate an atomicity failure amongst concurrent \
427+
compare-and-exchange operations on registers in this repository\\.""")
428+
);
429+
ll.onResponse(null);
430+
} else {
431+
ll.onFailure(e);
432+
}
433+
})));
434+
435+
assertFalse(registerWasCorrupted.get());
436+
}
437+
388438
public void testFailsIfRegisterHoldsSpuriousValue() {
389439
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
390440

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java

Lines changed: 100 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4343
import org.elasticsearch.common.util.concurrent.EsExecutors;
4444
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
45+
import org.elasticsearch.core.CheckedConsumer;
4546
import org.elasticsearch.core.Releasable;
4647
import org.elasticsearch.core.Releasables;
4748
import org.elasticsearch.core.TimeValue;
@@ -647,67 +648,110 @@ public void onFailure(Exception exp) {
647648
}
648649

649650
private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
650-
return () -> {
651-
if (isRunning()) {
652-
final var expectedFinalRegisterValue = expectedRegisterValue.get();
653-
transportService.getThreadPool()
654-
.executor(ThreadPool.Names.SNAPSHOT)
655-
.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<OptionalBytesReference>() {
656-
@Override
657-
public void onResponse(OptionalBytesReference actualFinalRegisterValue) {
658-
if (actualFinalRegisterValue.isPresent() == false
659-
|| longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) {
660-
fail(
661-
new RepositoryVerificationException(
662-
request.getRepositoryName(),
663-
Strings.format(
664-
"register [%s] should have value [%d] but instead had value [%s]",
665-
registerName,
666-
expectedFinalRegisterValue,
667-
actualFinalRegisterValue
651+
return new Runnable() {
652+
653+
final CheckedConsumer<ActionListener<OptionalBytesReference>, Exception> finalValueReader = switch (random.nextInt(3)) {
654+
case 0 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
655+
@Override
656+
public void accept(ActionListener<OptionalBytesReference> listener) {
657+
getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener);
658+
}
659+
660+
@Override
661+
public String toString() {
662+
return "getRegister";
663+
}
664+
};
665+
case 1 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
666+
@Override
667+
public void accept(ActionListener<OptionalBytesReference> listener) {
668+
getBlobContainer().compareAndExchangeRegister(
669+
OperationPurpose.REPOSITORY_ANALYSIS,
670+
registerName,
671+
bytesFromLong(expectedFinalRegisterValue),
672+
new BytesArray(new byte[] { (byte) 0xff }),
673+
listener
674+
);
675+
}
676+
677+
@Override
678+
public String toString() {
679+
return "compareAndExchangeRegister";
680+
}
681+
};
682+
case 2 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
683+
@Override
684+
public void accept(ActionListener<OptionalBytesReference> listener) {
685+
getBlobContainer().compareAndSetRegister(
686+
OperationPurpose.REPOSITORY_ANALYSIS,
687+
registerName,
688+
bytesFromLong(expectedFinalRegisterValue),
689+
new BytesArray(new byte[] { (byte) 0xff }),
690+
listener.map(
691+
b -> b
692+
? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue))
693+
: OptionalBytesReference.MISSING
694+
)
695+
);
696+
}
697+
698+
@Override
699+
public String toString() {
700+
return "compareAndSetRegister";
701+
}
702+
};
703+
default -> {
704+
assert false;
705+
throw new IllegalStateException();
706+
}
707+
};
708+
709+
long expectedFinalRegisterValue = Long.MIN_VALUE;
710+
711+
@Override
712+
public void run() {
713+
if (isRunning()) {
714+
expectedFinalRegisterValue = expectedRegisterValue.get();
715+
transportService.getThreadPool()
716+
.executor(ThreadPool.Names.SNAPSHOT)
717+
.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {
718+
@Override
719+
public void onResponse(OptionalBytesReference actualFinalRegisterValue) {
720+
if (actualFinalRegisterValue.isPresent() == false
721+
|| longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) {
722+
fail(
723+
new RepositoryVerificationException(
724+
request.getRepositoryName(),
725+
Strings.format(
726+
"""
727+
Successfully completed all [%d] atomic increments of register [%s] so its expected \
728+
value is [%s], but reading its value with [%s] unexpectedly yielded [%s]. This \
729+
anomaly may indicate an atomicity failure amongst concurrent compare-and-exchange \
730+
operations on registers in this repository.""",
731+
expectedFinalRegisterValue,
732+
registerName,
733+
OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue)),
734+
finalValueReader.toString(),
735+
actualFinalRegisterValue
736+
)
668737
)
669-
)
670-
);
738+
);
739+
}
671740
}
672-
}
673741

674-
@Override
675-
public void onFailure(Exception exp) {
676-
// Registers are not supported on all repository types, and that's ok.
677-
if (exp instanceof UnsupportedOperationException == false) {
678-
fail(exp);
679-
}
680-
}
681-
}, ref), listener -> {
682-
switch (random.nextInt(3)) {
683-
case 0 -> getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener);
684-
case 1 -> getBlobContainer().compareAndExchangeRegister(
685-
OperationPurpose.REPOSITORY_ANALYSIS,
686-
registerName,
687-
bytesFromLong(expectedFinalRegisterValue),
688-
new BytesArray(new byte[] { (byte) 0xff }),
689-
listener
690-
);
691-
case 2 -> getBlobContainer().compareAndSetRegister(
692-
OperationPurpose.REPOSITORY_ANALYSIS,
693-
registerName,
694-
bytesFromLong(expectedFinalRegisterValue),
695-
new BytesArray(new byte[] { (byte) 0xff }),
696-
listener.map(
697-
b -> b
698-
? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue))
699-
: OptionalBytesReference.MISSING
700-
)
701-
);
702-
default -> {
703-
assert false;
704-
throw new IllegalStateException();
742+
@Override
743+
public void onFailure(Exception exp) {
744+
// Registers are not supported on all repository types, and that's ok.
745+
if (exp instanceof UnsupportedOperationException == false) {
746+
fail(exp);
747+
}
705748
}
706-
}
707-
}));
708-
} else {
709-
ref.close();
749+
}, ref), finalValueReader));
750+
} else {
751+
ref.close();
752+
}
710753
}
754+
711755
};
712756
}
713757

0 commit comments

Comments
 (0)