Skip to content

Commit 127a6f0

Browse files
dao-junmukesh-ctds
authored andcommitted
[fix][offload] Fix Offload readHandle cannot close multi times. (apache#22162)
(cherry picked from commit e25c7f0) (cherry picked from commit 62de4a5)
1 parent 80da36e commit 127a6f0

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.ScheduledExecutorService;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
3031
import org.apache.bookkeeper.client.BKException;
3132
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
3233
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -36,6 +37,7 @@
3637
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
3738
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
3839
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
40+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3941
import org.apache.hadoop.io.BytesWritable;
4042
import org.apache.hadoop.io.LongWritable;
4143
import org.apache.hadoop.io.MapFile;
@@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
5355
private final LedgerOffloaderStats offloaderStats;
5456
private final String managedLedgerName;
5557
private final String topicName;
58+
enum State {
59+
Opened,
60+
Closed
61+
}
62+
private volatile State state;
63+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
5664

5765
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
5866
LedgerOffloaderStats offloaderStats,
@@ -72,6 +80,7 @@ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader r
7280
offloaderStats.recordReadOffloadIndexLatency(topicName,
7381
System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
7482
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
83+
state = State.Opened;
7584
} catch (IOException e) {
7685
log.error("Fail to read LedgerMetadata for ledgerId {}",
7786
ledgerId);
@@ -92,15 +101,20 @@ public LedgerMetadata getLedgerMetadata() {
92101

93102
@Override
94103
public CompletableFuture<Void> closeAsync() {
95-
CompletableFuture<Void> promise = new CompletableFuture<>();
104+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
105+
return closeFuture.get();
106+
}
107+
108+
CompletableFuture<Void> promise = closeFuture.get();
96109
executor.execute(() -> {
97-
try {
98-
reader.close();
99-
promise.complete(null);
100-
} catch (IOException t) {
101-
promise.completeExceptionally(t);
102-
}
103-
});
110+
try {
111+
reader.close();
112+
state = State.Closed;
113+
promise.complete(null);
114+
} catch (IOException t) {
115+
promise.completeExceptionally(t);
116+
}
117+
});
104118
return promise;
105119
}
106120

@@ -111,6 +125,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
111125
}
112126
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
113127
executor.execute(() -> {
128+
if (state == State.Closed) {
129+
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
130+
ledgerId, firstEntry, lastEntry);
131+
promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
132+
return;
133+
}
114134
if (firstEntry > lastEntry
115135
|| firstEntry < 0
116136
|| lastEntry > getLastAddConfirmed()) {

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.cache.Cache;
2223
import com.google.common.cache.CacheBuilder;
2324
import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicReference;
3335
import org.apache.bookkeeper.client.BKException;
3436
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
3537
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -67,13 +69,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge
6769
.newBuilder()
6870
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
6971
.build();
72+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
7073

7174
enum State {
7275
Opened,
7376
Closed
7477
}
7578

76-
private State state = null;
79+
private volatile State state = null;
7780

7881
private long lastAccessTimestamp = System.currentTimeMillis();
7982

@@ -99,18 +102,22 @@ public LedgerMetadata getLedgerMetadata() {
99102

100103
@Override
101104
public CompletableFuture<Void> closeAsync() {
102-
CompletableFuture<Void> promise = new CompletableFuture<>();
105+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
106+
return closeFuture.get();
107+
}
108+
109+
CompletableFuture<Void> promise = closeFuture.get();
103110
executor.execute(() -> {
104-
try {
105-
index.close();
106-
inputStream.close();
107-
entryOffsets.invalidateAll();
108-
state = State.Closed;
109-
promise.complete(null);
110-
} catch (IOException t) {
111-
promise.completeExceptionally(t);
112-
}
113-
});
111+
try {
112+
index.close();
113+
inputStream.close();
114+
entryOffsets.invalidateAll();
115+
state = State.Closed;
116+
promise.complete(null);
117+
} catch (IOException t) {
118+
promise.completeExceptionally(t);
119+
}
120+
});
114121
return promise;
115122
}
116123

@@ -303,6 +310,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
303310
}
304311

305312
// for testing
313+
@VisibleForTesting
306314
State getState() {
307315
return this.state;
308316
}

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.ExecutorService;
3131
import java.util.concurrent.ScheduledExecutorService;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
3334
import lombok.val;
3435
import org.apache.bookkeeper.client.BKException;
3536
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
6061
private final List<BackedInputStream> inputStreams;
6162
private final List<DataInputStream> dataStreams;
6263
private final ExecutorService executor;
63-
private State state = null;
64+
private volatile State state = null;
65+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
6466

6567
enum State {
6668
Opened,
@@ -123,7 +125,11 @@ public LedgerMetadata getLedgerMetadata() {
123125

124126
@Override
125127
public CompletableFuture<Void> closeAsync() {
126-
CompletableFuture<Void> promise = new CompletableFuture<>();
128+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
129+
return closeFuture.get();
130+
}
131+
132+
CompletableFuture<Void> promise = closeFuture.get();
127133
executor.execute(() -> {
128134
try {
129135
for (OffloadIndexBlockV2 indexBlock : indices) {
@@ -143,7 +149,9 @@ public CompletableFuture<Void> closeAsync() {
143149

144150
@Override
145151
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
146-
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
152+
if (log.isDebugEnabled()) {
153+
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
154+
}
147155
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
148156
executor.execute(() -> {
149157
if (state == State.Closed) {

0 commit comments

Comments
 (0)