Skip to content

Commit b5cc1c4

Browse files
authored
Rename S3 CompareAndExchangeOperation (#138422)
As a preparatory step to providing an alternative implementation of the compare-and-exchange operation on S3, this commit renames the existing implementation and shifts the necessary exception-handling logic inside its `run` method for improved encapsulation.
1 parent 83737eb commit b5cc1c4

File tree

1 file changed

+37
-21
lines changed

1 file changed

+37
-21
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,10 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
783783
}
784784
}
785785

786-
private class CompareAndExchangeOperation {
786+
/**
787+
* An implementation of {@link BlobContainer#compareAndExchangeRegister} based on strongly-consistent multipart upload APIs.
788+
*/
789+
private class MultipartUploadCompareAndExchangeOperation {
787790

788791
private final OperationPurpose purpose;
789792
private final S3Client client;
@@ -792,7 +795,13 @@ private class CompareAndExchangeOperation {
792795
private final String blobKey;
793796
private final ThreadPool threadPool;
794797

795-
CompareAndExchangeOperation(OperationPurpose purpose, S3Client client, String bucket, String key, ThreadPool threadPool) {
798+
MultipartUploadCompareAndExchangeOperation(
799+
OperationPurpose purpose,
800+
S3Client client,
801+
String bucket,
802+
String key,
803+
ThreadPool threadPool
804+
) {
796805
this.purpose = purpose;
797806
this.client = client;
798807
this.bucket = bucket;
@@ -802,6 +811,23 @@ private class CompareAndExchangeOperation {
802811
}
803812

804813
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
814+
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
815+
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
816+
if (e instanceof AwsServiceException awsServiceException
817+
&& (awsServiceException.statusCode() == 404
818+
|| awsServiceException.statusCode() == 200
819+
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
820+
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
821+
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
822+
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
823+
delegate.onResponse(OptionalBytesReference.MISSING);
824+
} else {
825+
delegate.onFailure(e);
826+
}
827+
}));
828+
}
829+
830+
void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
805831
BlobContainerUtils.ensureValidRegisterContent(updated);
806832

807833
if (hasPreexistingUploads()) {
@@ -1094,25 +1120,15 @@ public void compareAndExchangeRegister(
10941120
ActionListener<OptionalBytesReference> listener
10951121
) {
10961122
final var clientReference = blobStore.clientReference();
1097-
ActionListener.run(ActionListener.releaseAfter(listener.delegateResponse((delegate, e) -> {
1098-
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", key), e);
1099-
if (e instanceof AwsServiceException awsServiceException
1100-
&& (awsServiceException.statusCode() == 404
1101-
|| awsServiceException.statusCode() == 200
1102-
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
1103-
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
1104-
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
1105-
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
1106-
delegate.onResponse(OptionalBytesReference.MISSING);
1107-
} else {
1108-
delegate.onFailure(e);
1109-
}
1110-
}), clientReference),
1111-
l -> new CompareAndExchangeOperation(purpose, clientReference.client(), blobStore.bucket(), key, blobStore.getThreadPool()).run(
1112-
expected,
1113-
updated,
1114-
l
1115-
)
1123+
ActionListener.run(
1124+
ActionListener.releaseBefore(clientReference, listener),
1125+
l -> new MultipartUploadCompareAndExchangeOperation(
1126+
purpose,
1127+
clientReference.client(),
1128+
blobStore.bucket(),
1129+
key,
1130+
blobStore.getThreadPool()
1131+
).run(expected, updated, l)
11161132
);
11171133
}
11181134

0 commit comments

Comments
 (0)