Skip to content

Commit 83ddbe1

Browse files
authored
feat(s3stream): support call #close(force=true) after #close(force=false) (#1854)
Signed-off-by: Robin Han <[email protected]>
1 parent 39c0f5d commit 83ddbe1

File tree

2 files changed

+39
-18
lines changed

2 files changed

+39
-18
lines changed

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class S3Stream implements Stream {
8181
private final AsyncNetworkBandwidthLimiter networkOutboundLimiter;
8282
private long startOffset;
8383
private CompletableFuture<Void> lastPendingTrim = CompletableFuture.completedFuture(null);
84+
private CompletableFuture<Void> closeCf;
8485

8586
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
8687
StreamManager streamManager) {
@@ -348,6 +349,15 @@ public CompletableFuture<Void> close(boolean force) {
348349
if (force) {
349350
pendingRequests.forEach(cf -> cf.completeExceptionally(new StreamClientException(ErrorCode.UNEXPECTED, "FORCE_CLOSE")));
350351
}
352+
// Supported repeated call #close.
353+
// And it supports the following scenario:
354+
// 1. #close(false): try gracefully close the stream. (await the pending request complete, upload data to object storage and close stream)
355+
// 2. #close(true): try force close the stream. (complete the pending request with exception, upload data to object storage and close stream)
356+
// Even #close(false) is called, we still could use #close(true) to force close the stream.
357+
if (closeCf != null) {
358+
return closeCf;
359+
}
360+
351361
CompletableFuture<Void> awaitPendingRequestsCf = CompletableFuture.allOf(pendingRequests.toArray(new CompletableFuture[0]));
352362
CompletableFuture<Void> closeCf = new CompletableFuture<>();
353363

@@ -366,6 +376,7 @@ public CompletableFuture<Void> close(boolean force) {
366376
S3StreamMetricsManager.removePendingStreamFetchLatencySupplier(streamId);
367377
});
368378

379+
this.closeCf = closeCf;
369380
return closeCf;
370381
} finally {
371382
writeLock.unlock();

s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,12 @@ public class S3StreamClient implements StreamClient {
8383
private final ReentrantLock lock = new ReentrantLock();
8484

8585
final Map<Long, CompletableFuture<Stream>> openingStreams = new ConcurrentHashMap<>();
86-
final Map<Long, CompletableFuture<Stream>> closingStreams = new ConcurrentHashMap<>();
86+
final Map<Long, StreamWrapper> closingStreams = new ConcurrentHashMap<>();
8787

8888
private final List<StreamLifeCycleListener> streamLifeCycleListeners = new CopyOnWriteArrayList<>();
8989

9090
private boolean closed;
91+
private boolean forceCloseMark;
9192

9293
@SuppressWarnings("unused")
9394
public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager,
@@ -201,17 +202,23 @@ public void shutdown() {
201202
}
202203

203204
TimerUtil timerUtil = new TimerUtil();
204-
closeStreams(false);
205+
closeStreams();
205206
LOGGER.info("S3StreamClient shutdown, cost {}ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
206207
}
207208

208-
private void closeStreams(boolean force) {
209+
private void closeStreams() {
210+
if (forceCloseMark) {
211+
runInLock(() -> closingStreams.forEach((streamId, stream) -> {
212+
LOGGER.info("force close closing stream, streamId={}", streamId);
213+
stream.close(true);
214+
}));
215+
}
209216
for (; ; ) {
210217
lock.lock();
211218
try {
212219
openedStreams.forEach((streamId, stream) -> {
213220
LOGGER.info("trigger stream close, streamId={}", streamId);
214-
stream.close(force);
221+
stream.close(forceCloseMark);
215222
});
216223
if (openedStreams.isEmpty() && openingStreams.isEmpty() && closingStreams.isEmpty()) {
217224
LOGGER.info("all streams are closed");
@@ -227,11 +234,10 @@ private void closeStreams(boolean force) {
227234

228235
public void forceClose() {
229236
markClosed();
230-
closeStreams(true);
237+
runInLock(() -> forceCloseMark = true);
238+
closeStreams();
231239
}
232240

233-
234-
235241
private void checkState() {
236242
if (closed) {
237243
throw new IllegalStateException("S3StreamClient is already closed");
@@ -321,15 +327,19 @@ public CompletableFuture<Void> close() {
321327
public CompletableFuture<Void> close(boolean force) {
322328
return runInLock(() -> {
323329
CompletableFuture<Stream> cf = new CompletableFuture<>();
324-
openedStreams.remove(streamId(), this);
325-
closingStreams.put(streamId(), cf);
326-
return stream.close(force).whenComplete((v, e) -> runInLock(() -> {
327-
cf.complete(StreamWrapper.this);
328-
closingStreams.remove(streamId(), cf);
329-
for (StreamLifeCycleListener listener : streamLifeCycleListeners) {
330-
listener.onStreamClose(streamId());
331-
}
332-
}));
330+
long streamId = streamId();
331+
if (openedStreams.remove(streamId, this)) {
332+
closingStreams.put(streamId, this);
333+
return stream.close(force).whenComplete((v, e) -> runInLock(() -> {
334+
cf.complete(StreamWrapper.this);
335+
closingStreams.remove(streamId, this);
336+
for (StreamLifeCycleListener listener : streamLifeCycleListeners) {
337+
listener.onStreamClose(streamId);
338+
}
339+
}));
340+
} else {
341+
return stream.close(force);
342+
}
333343
});
334344
}
335345

@@ -338,10 +348,10 @@ public CompletableFuture<Void> destroy() {
338348
return runInLock(() -> {
339349
CompletableFuture<Stream> cf = new CompletableFuture<>();
340350
openedStreams.remove(streamId(), this);
341-
closingStreams.put(streamId(), cf);
351+
closingStreams.put(streamId(), this);
342352
return stream.destroy().whenComplete((v, e) -> runInLock(() -> {
343353
cf.complete(StreamWrapper.this);
344-
closingStreams.remove(streamId(), cf);
354+
closingStreams.remove(streamId(), this);
345355
}));
346356
});
347357
}

0 commit comments

Comments
 (0)