Skip to content

Commit 5dec80b

Browse files
authored
Fix exception handling in S3 compareAndExchangeRegister (#138488)
In #138422 we shifted the exception handling for `compareAndExchangeRegister` into its `run()` method, but this introduced a subtle bug: an `AwsServiceException` may be thrown synchronously, bypassing the handling added to the listener with the `delegateResponse` call. This commit reinstates the missing exception handling.
1 parent 667ffc0 commit 5dec80b

File tree

2 files changed

+49
-16
lines changed

2 files changed

+49
-16
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation {
810810
this.threadPool = threadPool;
811811
}
812812

813-
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
814-
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
813+
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) {
814+
ActionListener.run(listener.delegateResponse((delegate, e) -> {
815815
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
816-
if (e instanceof AwsServiceException awsServiceException
817-
&& (awsServiceException.statusCode() == 404
818-
|| awsServiceException.statusCode() == 200
816+
if ((e instanceof AwsServiceException awsServiceException)
817+
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
818+
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
819819
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
820820
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
821821
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
@@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
824824
} else {
825825
delegate.onFailure(e);
826826
}
827-
}));
827+
}), l -> innerRun(expected, updated, l));
828828
}
829829

830830
void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
@@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister(
11201120
ActionListener<OptionalBytesReference> listener
11211121
) {
11221122
final var clientReference = blobStore.clientReference();
1123-
ActionListener.run(
1124-
ActionListener.releaseBefore(clientReference, listener),
1125-
l -> new MultipartUploadCompareAndExchangeOperation(
1126-
purpose,
1127-
clientReference.client(),
1128-
blobStore.bucket(),
1129-
key,
1130-
blobStore.getThreadPool()
1131-
).run(expected, updated, l)
1132-
);
1123+
new MultipartUploadCompareAndExchangeOperation(
1124+
purpose,
1125+
clientReference.client(),
1126+
blobStore.bucket(),
1127+
key,
1128+
blobStore.getThreadPool()
1129+
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
11331130
}
11341131

11351132
@Override

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,42 @@ public void testRetryOn403InStateless() {
13461346
assertEquals(denyAccessAfterAttempt <= maxRetries ? 1 : 0, accessDeniedResponseCount.get());
13471347
}
13481348

1349+
public void testUploadNotFoundInCompareAndExchange() {
1350+
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
1351+
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);
1352+
1353+
@SuppressForbidden(reason = "use a http server")
1354+
class RejectsUploadPartRequests extends S3HttpHandler {
1355+
RejectsUploadPartRequests() {
1356+
super("bucket");
1357+
}
1358+
1359+
@Override
1360+
public void handle(HttpExchange exchange) throws IOException {
1361+
if (parseRequest(exchange).isUploadPartRequest()) {
1362+
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
1363+
} else {
1364+
super.handle(exchange);
1365+
}
1366+
}
1367+
}
1368+
1369+
httpServer.createContext("/", new RejectsUploadPartRequests());
1370+
1371+
safeAwait(
1372+
l -> statefulBlobContainer.compareAndExchangeRegister(
1373+
randomPurpose(),
1374+
"not_found_register",
1375+
BytesArray.EMPTY,
1376+
new BytesArray(new byte[1]),
1377+
l.map(result -> {
1378+
assertFalse(result.isPresent());
1379+
return null;
1380+
})
1381+
)
1382+
);
1383+
}
1384+
13491385
private static String getBase16MD5Digest(BytesReference bytesReference) {
13501386
return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5()));
13511387
}

0 commit comments

Comments
 (0)