Skip to content

Commit 40e5ea3

Browse files
authored
Support weaker consistency model for S3 MPUs (#138663)
Adjusts the implementation of linearizable registers in S3 repositories to allow for the weaker multipart upload API semantics observed in practice. Also adjusts the S3 test fixture to (optionally) simulate the weaker semantics, and extends the repository analysis REST tests to cover both cases.
1 parent 93d9a0b commit 40e5ea3

File tree

11 files changed

+458
-166
lines changed

11 files changed

+458
-166
lines changed

docs/changelog/138663.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138663
2+
summary: Support weaker consistency model for S3 MPUs
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

docs/release-notes/known-issues.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ This issue will be fixed in a future patch release (see [PR #126990](https://git
8989
* switching the order of the grouping keys (eg. `STATS ... BY keyword2, keyword1`, if the `keyword2` has a lower cardinality)
9090
* reducing the grouping key cardinality, by filtering out values before STATS
9191
92-
* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue currently affects all supported versions of {{es}}. The plan to address it is described in [#137197](https://github.com/elastic/elasticsearch/issues/137197).
92+
* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using a version of {{es}} prior to 9.3.0 and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663).

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
2121
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
2222
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
23+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2324
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
2425
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
2526
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
5758
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
5859
import org.elasticsearch.common.blobstore.support.BlobMetadata;
60+
import org.elasticsearch.common.bytes.BytesArray;
5961
import org.elasticsearch.common.bytes.BytesReference;
6062
import org.elasticsearch.common.collect.Iterators;
6163
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -832,11 +834,14 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
832834
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
833835
if ((e instanceof AwsServiceException awsServiceException)
834836
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
837+
|| awsServiceException.statusCode() == RestStatus.CONFLICT.getStatus()
838+
|| awsServiceException.statusCode() == RestStatus.PRECONDITION_FAILED.getStatus()
835839
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
836840
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
837841
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
838842
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
839-
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
843+
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention.
844+
// Also if something else changed the blob out from under us then the If-Match check results in a 409 or a 412.
840845
delegate.onResponse(OptionalBytesReference.MISSING);
841846
} else {
842847
delegate.onFailure(e);
@@ -891,20 +896,20 @@ void innerRun(BytesReference expected, BytesReference updated, ActionListener<Op
891896
// cannot have observed a stale value, whereas if our operation ultimately fails then it doesn't matter what this read
892897
// observes.
893898

894-
.<OptionalBytesReference>andThen(l -> getRegister(purpose, rawKey, l))
899+
.<RegisterAndEtag>andThen(l -> getRegisterAndEtag(purpose, rawKey, l))
895900

896901
// Step 5: Perform the compare-and-swap by completing our upload iff the witnessed value matches the expected value.
897902

898-
.andThenApply(currentValue -> {
899-
if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
903+
.andThenApply(currentValueAndEtag -> {
904+
if (currentValueAndEtag.registerContents().equals(expected)) {
900905
logger.trace("[{}] completing upload [{}]", blobKey, uploadId);
901-
completeMultipartUpload(uploadId, partETag);
906+
completeMultipartUpload(uploadId, partETag, currentValueAndEtag.eTag());
902907
} else {
903908
// Best-effort attempt to clean up after ourselves.
904909
logger.trace("[{}] aborting upload [{}]", blobKey, uploadId);
905910
safeAbortMultipartUpload(uploadId);
906911
}
907-
return currentValue;
912+
return OptionalBytesReference.of(currentValueAndEtag.registerContents());
908913
})
909914

910915
// Step 6: Complete the listener.
@@ -1111,7 +1116,7 @@ private void abortMultipartUploadIfExists(String uploadId) {
11111116
}
11121117
}
11131118

1114-
private void completeMultipartUpload(String uploadId, String partETag) {
1119+
private void completeMultipartUpload(String uploadId, String partETag, String existingEtag) {
11151120
final var completeMultipartUploadRequestBuilder = CompleteMultipartUploadRequest.builder()
11161121
.bucket(bucket)
11171122
.key(blobKey)
@@ -1123,6 +1128,14 @@ private void completeMultipartUpload(String uploadId, String partETag) {
11231128
Operation.PUT_MULTIPART_OBJECT,
11241129
purpose
11251130
);
1131+
if (blobStore.supportsConditionalWrites(purpose)) {
1132+
if (existingEtag == null) {
1133+
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
1134+
} else {
1135+
completeMultipartUploadRequestBuilder.ifMatch(existingEtag);
1136+
}
1137+
}
1138+
11261139
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
11271140
client.completeMultipartUpload(completeMultipartUploadRequest);
11281141
}
@@ -1146,8 +1159,23 @@ public void compareAndExchangeRegister(
11461159
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
11471160
}
11481161

1162+
/**
1163+
* @param registerContents Contents of the register blob; {@link BytesArray#EMPTY} if the blob is absent.
1164+
* @param eTag Etag of the register blob; {@code null} if and only if the blob is absent.
1165+
*/
1166+
private record RegisterAndEtag(BytesReference registerContents, String eTag) {
1167+
/**
1168+
* Sentinel value to indicate that the register blob is absent.
1169+
*/
1170+
static RegisterAndEtag ABSENT = new RegisterAndEtag(BytesArray.EMPTY, null);
1171+
}
1172+
11491173
@Override
11501174
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
1175+
getRegisterAndEtag(purpose, key, listener.map(registerAndEtag -> OptionalBytesReference.of(registerAndEtag.registerContents())));
1176+
}
1177+
1178+
void getRegisterAndEtag(OperationPurpose purpose, String key, ActionListener<RegisterAndEtag> listener) {
11511179
ActionListener.completeWith(listener, () -> {
11521180
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
11531181
? BackoffPolicy.noBackoff()
@@ -1163,11 +1191,14 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
11631191
var clientReference = blobStore.clientReference();
11641192
var s3Object = clientReference.client().getObject(getObjectRequest);
11651193
) {
1166-
return OptionalBytesReference.of(getRegisterUsingConsistentRead(s3Object, keyPath, key));
1194+
return new RegisterAndEtag(
1195+
getRegisterUsingConsistentRead(s3Object, keyPath, key),
1196+
getRequiredEtag(purpose, s3Object.response())
1197+
);
11671198
} catch (Exception attemptException) {
11681199
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
11691200
if (attemptException instanceof SdkServiceException sdkException && sdkException.statusCode() == 404) {
1170-
return OptionalBytesReference.EMPTY;
1201+
return RegisterAndEtag.ABSENT;
11711202
} else if (finalException == null) {
11721203
finalException = attemptException;
11731204
} else if (finalException != attemptException) {
@@ -1191,6 +1222,23 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
11911222
});
11921223
}
11931224

1225+
/**
1226+
* @return the {@code ETag} header from a {@link GetObjectResponse}, failing with an exception if it is omitted (unless not required
1227+
* for the given {@link OperationPurpose}).
1228+
*/
1229+
private String getRequiredEtag(OperationPurpose purpose, GetObjectResponse getObjectResponse) {
1230+
final var etag = getObjectResponse.eTag();
1231+
if (Strings.hasText(etag)) {
1232+
return etag;
1233+
} else if (blobStore.supportsConditionalWrites(purpose)) {
1234+
throw new UnsupportedOperationException("GetObject response contained no ETag header, cannot perform conditional write");
1235+
} else {
1236+
// blob stores which do not support conditional writes may also not return ETag headers, but we won't use it anyway so return
1237+
// a non-null dummy value
1238+
return "es-missing-but-ignored-etag";
1239+
}
1240+
}
1241+
11941242
ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) {
11951243
try (var clientReference = blobStore.clientReference()) {
11961244
final var bucket = blobStore.bucket();

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import java.util.regex.Pattern;
9494

9595
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
96+
import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes;
9697
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
9798
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
9899
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
@@ -248,7 +249,7 @@ protected BlobContainer createBlobContainer(
248249
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
249250
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
250251
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
251-
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY),
252+
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY) == Boolean.FALSE,
252253
repositoryMetadata,
253254
BigArrays.NON_RECYCLING_INSTANCE,
254255
new DeterministicTaskQueue().getThreadPool(),
@@ -1381,6 +1382,76 @@ public void handle(HttpExchange exchange) throws IOException {
13811382
);
13821383
}
13831384

1385+
public void testCompareAndExchangeWithConcurrentPutObject() throws Exception {
1386+
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
1387+
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);
1388+
1389+
final var objectContentsRequestedLatch = new CountDownLatch(1);
1390+
1391+
@SuppressForbidden(reason = "use a http server")
1392+
class AwaitsListMultipartUploads extends S3HttpHandler {
1393+
AwaitsListMultipartUploads() {
1394+
super("bucket");
1395+
}
1396+
1397+
@Override
1398+
public void handle(HttpExchange exchange) throws IOException {
1399+
if (parseRequest(exchange).isGetObjectRequest()) {
1400+
// delay the overwrite until the CAS is checking the object contents, forcing a race
1401+
objectContentsRequestedLatch.countDown();
1402+
}
1403+
super.handle(exchange);
1404+
}
1405+
}
1406+
1407+
httpServer.createContext("/", new AwaitsListMultipartUploads());
1408+
1409+
final var blobName = randomIdentifier();
1410+
final var initialValue = randomBytesReference(8);
1411+
final var overwriteValue = randomValueOtherThan(initialValue, () -> randomBytesReference(8));
1412+
final var casTargetValue = randomValueOtherThanMany(
1413+
v -> v.equals(initialValue) || v.equals(overwriteValue),
1414+
() -> randomBytesReference(8)
1415+
);
1416+
1417+
statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, initialValue, randomBoolean());
1418+
1419+
runInParallel(
1420+
() -> safeAwait(
1421+
l -> statefulBlobContainer.compareAndExchangeRegister(
1422+
randomPurpose(),
1423+
blobName,
1424+
initialValue,
1425+
casTargetValue,
1426+
l.map(result -> {
1427+
// Really anything can happen here: success (sees initialValue) or failure (sees overwriteValue), or contention
1428+
if (result.isPresent()) {
1429+
assertThat(
1430+
result.toString(),
1431+
result.bytesReference(),
1432+
anyOf(equalBytes(initialValue), equalBytes(overwriteValue))
1433+
);
1434+
}
1435+
return null;
1436+
})
1437+
)
1438+
),
1439+
() -> {
1440+
try {
1441+
safeAwait(objectContentsRequestedLatch);
1442+
statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, overwriteValue, false);
1443+
} catch (IOException e) {
1444+
throw new AssertionError("writeBlobAtomic failed", e);
1445+
}
1446+
}
1447+
);
1448+
1449+
// If the CAS happens before the overwrite then we'll see the overwritten value, whereas if the CAS happens second then it will
1450+
// fail because the value was overwritten leaving the overwritten value in place. If, however, the CAS were not atomic with respect
1451+
// to other non-CAS-based writes, then we would see casTargetValue here:
1452+
assertThat(Streams.readFully(statefulBlobContainer.readBlob(randomPurpose(), blobName)), equalBytes(overwriteValue));
1453+
}
1454+
13841455
@Override
13851456
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
13861457
// some attempts make meaningful progress and do not count towards the max retry limit
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package fixture.s3;
11+
12+
/**
13+
* AWS S3 has weaker consistency for its multipart upload APIs than initially claimed (see support cases 10837136441 and 176070774900712)
14+
* but strong consistency of conditional writes based on the {@code If-Match} and {@code If-None-Match} headers. Other object storage
15+
* suppliers have decided instead to implement strongly-consistent multipart upload APIs and ignore the conditional writes headers. We
16+
* verify Elasticsearch's behaviour against both models.
17+
*/
18+
public enum S3ConsistencyModel {
19+
/**
20+
* The model implemented by AWS S3: multipart upload APIs are somewhat weak (e.g. aborts may return while the write operation is still
21+
* in flight) but conditional writes work as expected.
22+
*/
23+
AWS_DEFAULT(true, false),
24+
25+
/**
26+
* The alternative model verified by these tests: the multipart upload APIs are strongly consistent, but the {@code If-Match} and
27+
* {@code If-None-Match} headers are ignored and all writes are unconditional.
28+
*/
29+
STRONG_MPUS(false, true);
30+
31+
private final boolean conditionalWrites;
32+
private final boolean strongMultipartUploads;
33+
34+
S3ConsistencyModel(boolean conditionalWrites, boolean strongMultipartUploads) {
35+
this.conditionalWrites = conditionalWrites;
36+
this.strongMultipartUploads = strongMultipartUploads;
37+
}
38+
39+
public boolean hasStrongMultipartUploads() {
40+
return strongMultipartUploads;
41+
}
42+
43+
public boolean hasConditionalWrites() {
44+
return conditionalWrites;
45+
}
46+
}

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public void handle(final HttpExchange exchange) throws IOException {
7070
throw e;
7171
}
7272
}
73+
74+
@Override
75+
protected S3ConsistencyModel consistencyModel() {
76+
return S3HttpFixture.this.consistencyModel();
77+
}
7378
};
7479
}
7580

@@ -109,4 +114,8 @@ protected void after() {
109114
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
110115
}
111116
}
117+
118+
protected S3ConsistencyModel consistencyModel() {
119+
return S3ConsistencyModel.AWS_DEFAULT;
120+
}
112121
}

0 commit comments

Comments
 (0)