Skip to content

Commit c5672b3

Browse files
authored
fix(s3stream): add pending requests await timeout for S3Stream#close (#2750)
Signed-off-by: Robin Han <[email protected]>
1 parent be42048 commit c5672b3

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,10 +393,11 @@ public CompletableFuture<Void> close(boolean force) {
393393

394394
// await all pending append/fetch/trim request
395395
List<CompletableFuture<?>> pendingRequests = new ArrayList<>(pendingAppends);
396+
// add timeout to prevent the fetch(catch-up read) network throttle to block Stream#close.
396397
if (GlobalSwitch.STRICT) {
397-
pendingRequests.addAll(pendingFetches);
398+
pendingRequests.addAll(FutureUtil.timeoutAndSilence(pendingFetches.stream(), 10, TimeUnit.SECONDS));
398399
}
399-
pendingRequests.add(lastPendingTrim);
400+
pendingRequests.add(FutureUtil.timeoutAndSilence(lastPendingTrim, 10, TimeUnit.SECONDS));
400401
if (force) {
401402
pendingRequests.forEach(cf -> cf.completeExceptionally(new StreamClientException(ErrorCode.UNEXPECTED, "FORCE_CLOSE")));
402403
}

s3stream/src/main/java/com/automq/stream/utils/FutureUtil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.util.Iterator;
26+
import java.util.List;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CompletionException;
2829
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,8 @@
3132
import java.util.concurrent.TimeoutException;
3233
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.function.Supplier;
35+
import java.util.stream.Collectors;
36+
import java.util.stream.Stream;
3437

3538
import io.netty.util.HashedWheelTimer;
3639
import io.netty.util.Timeout;
@@ -165,4 +168,11 @@ public static <T> CompletableFuture<T> timeoutWithNewReturn(CompletableFuture<T>
165168
return newCf;
166169
}
167170

171+
public static List<CompletableFuture<?>> timeoutAndSilence(Stream<CompletableFuture<?>> stream, long timeout, TimeUnit timeUnit) {
172+
return stream.map(l -> timeoutAndSilence(l, timeout, timeUnit)).collect(Collectors.toList());
173+
}
174+
175+
public static <T> CompletableFuture<T> timeoutAndSilence(CompletableFuture<T> cf, long timeout, TimeUnit timeUnit) {
176+
return cf.orTimeout(timeout, timeUnit).exceptionally(ex -> null);
177+
}
168178
}

0 commit comments

Comments
 (0)