Skip to content

Commit 730aa9f

Browse files
authored
fix(s3wal): 503 SlowDown during WAL recovery from S3 (#3198)
1 parent 1f6ff1c commit 730aa9f

File tree

2 files changed

+46
-14
lines changed

2 files changed

+46
-14
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,34 @@ protected <T> boolean bucketCheck(int bucketId, CompletableFuture<T> cf) {
935935
return true;
936936
}
937937

938+
protected <T> CompletableFuture<T> retryOnThrottle(Supplier<CompletableFuture<T>> operation,
939+
S3Operation operationType, int retryCount) {
940+
941+
CompletableFuture<T> cf = new CompletableFuture<>();
942+
operation.get().whenComplete((result, ex) -> {
943+
if (ex == null) {
944+
cf.complete(result);
945+
return;
946+
}
947+
948+
Throwable cause = FutureUtil.cause(ex);
949+
if (!isThrottled(cause, retryCount)) {
950+
cf.completeExceptionally(cause);
951+
return;
952+
}
953+
954+
int delay = retryDelay(operationType, retryCount);
955+
scheduler.schedule(() ->
956+
retryOnThrottle(operation, operationType, retryCount + 1)
957+
.whenComplete((r, e) -> {
958+
if (e != null) cf.completeExceptionally(e);
959+
else cf.complete(r);
960+
}),
961+
delay, TimeUnit.MILLISECONDS);
962+
});
963+
return cf;
964+
}
965+
938966
protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() {
939967
return new DeleteObjectsAccumulator(this::doDeleteObjects);
940968
}

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
5858
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
5959
import software.amazon.awssdk.core.exception.SdkClientException;
60+
import software.amazon.awssdk.core.retry.RetryMode;
6061
import software.amazon.awssdk.http.HttpStatusCode;
6162
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
6263
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -309,21 +310,23 @@ public CompletableFuture<Void> doDeleteObjects(List<String> objectKeys) {
309310
.delete(Delete.builder().objects(toDeleteKeys).build())
310311
.build();
311312

312-
CompletableFuture<Void> cf = new CompletableFuture<>();
313-
this.writeS3Client.deleteObjects(request)
314-
.thenAccept(resp -> {
315-
try {
316-
checkDeleteObjectsResponse(resp);
317-
cf.complete(null);
318-
} catch (Throwable ex) {
313+
return retryOnThrottle(() -> {
314+
CompletableFuture<Void> cf = new CompletableFuture<>();
315+
writeS3Client.deleteObjects(request)
316+
.thenAccept(resp -> {
317+
try {
318+
checkDeleteObjectsResponse(resp);
319+
cf.complete(null);
320+
} catch (Throwable ex) {
321+
cf.completeExceptionally(ex);
322+
}
323+
})
324+
.exceptionally(ex -> {
319325
cf.completeExceptionally(ex);
320-
}
321-
})
322-
.exceptionally(ex -> {
323-
cf.completeExceptionally(ex);
324-
return null;
325-
});
326-
return cf;
326+
return null;
327+
});
328+
return cf;
329+
}, S3Operation.DELETE_OBJECTS, 0);
327330
}
328331

329332
@Override
@@ -438,6 +441,7 @@ protected ClientOverrideConfiguration clientOverrideConfiguration(long apiCallTi
438441
return ClientOverrideConfiguration.builder()
439442
.apiCallTimeout(Duration.ofMillis(apiCallTimeoutMs))
440443
.apiCallAttemptTimeout(Duration.ofMillis(apiCallAttemptTimeoutMs))
444+
.retryStrategy(RetryMode.STANDARD)
441445
.build();
442446
}
443447

0 commit comments

Comments
 (0)