Skip to content

Commit 8641ba8

Browse files
authored
fix(s3stream): add pending requests await timeout for S3Stream#close (#2751)
1 parent 387557b commit 8641ba8

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,11 @@ public CompletableFuture<Void> close(boolean force) {
381381

382382
// await all pending append/fetch/trim request
383383
List<CompletableFuture<?>> pendingRequests = new ArrayList<>(pendingAppends);
384+
// add timeout to prevent the fetch(catch-up read) network throttle to block Stream#close.
384385
if (GlobalSwitch.STRICT) {
385-
pendingRequests.addAll(pendingFetches);
386+
pendingRequests.addAll(FutureUtil.timeoutAndSilence(pendingFetches.stream(), 10, TimeUnit.SECONDS));
386387
}
387-
pendingRequests.add(lastPendingTrim);
388+
pendingRequests.add(FutureUtil.timeoutAndSilence(lastPendingTrim, 10, TimeUnit.SECONDS));
388389
if (force) {
389390
pendingRequests.forEach(cf -> cf.completeExceptionally(new StreamClientException(ErrorCode.UNEXPECTED, "FORCE_CLOSE")));
390391
}

s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.util.Iterator;
26+
import java.util.List;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CompletionException;
2829
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,8 @@
3132
import java.util.concurrent.TimeoutException;
3233
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.function.Supplier;
35+
import java.util.stream.Collectors;
36+
import java.util.stream.Stream;
3437

3538
import io.netty.util.HashedWheelTimer;
3639
import io.netty.util.Timeout;
@@ -165,4 +168,11 @@ public static <T> CompletableFuture<T> timeoutWithNewReturn(CompletableFuture<T>
165168
return newCf;
166169
}
167170

171+
public static List<CompletableFuture<?>> timeoutAndSilence(Stream<CompletableFuture<?>> stream, long timeout, TimeUnit timeUnit) {
172+
return stream.map(l -> timeoutAndSilence(l, timeout, timeUnit)).collect(Collectors.toList());
173+
}
174+
175+
public static <T> CompletableFuture<T> timeoutAndSilence(CompletableFuture<T> cf, long timeout, TimeUnit timeUnit) {
176+
return cf.orTimeout(timeout, timeUnit).exceptionally(ex -> null);
177+
}
168178
}

0 commit comments

Comments
 (0)