From b6868135939e31586702617cb4fe3df38e35368f Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 13:33:43 +0800 Subject: [PATCH 1/6] Fix Offload readHandle cannot close multi times. --- .../apache/pulsar/common/util/FutureUtil.java | 17 +++++++++ .../impl/FileStoreBackedReadHandleImpl.java | 23 ++++++++++-- .../impl/BlobStoreBackedReadHandleImpl.java | 13 +++++-- .../impl/BlobStoreBackedReadHandleImplV2.java | 35 +++++++++++-------- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 6f62589853593..9149aa41627bb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -347,4 +347,21 @@ public static void safeRunAsync(Runnable runnable, return null; }); } + + public static @Nonnull CompletableFuture apply(@Nonnull CompletableFuture s, + @Nonnull CompletableFuture d) { + if (s == d) { + return d; + } + + s.whenComplete((v, t) -> { + if (t == null) { + d.complete(v); + } else { + d.completeExceptionally(t); + } + }); + + return d; + } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 49b2071f5db2c..17079b488adeb 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -36,11 +37,13 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle { private final LedgerOffloaderStats offloaderStats; private final String managedLedgerName; private final String topicName; + enum State { + Opened, + Closed + } + private volatile State state; + private final AtomicReference> closeFuture = new AtomicReference<>(); private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId, LedgerOffloaderStats offloaderStats, @@ -72,6 +81,7 @@ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader r offloaderStats.recordReadOffloadIndexLatency(topicName, System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS); this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes()); + state = State.Opened; } catch (IOException e) { log.error("Fail to read LedgerMetadata for ledgerId {}", ledgerId); @@ -93,15 +103,18 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - executor.execute(() -> { + if (closeFuture.compareAndSet(null, promise)) { + executor.execute(() -> { try { reader.close(); + state = State.Closed; promise.complete(null); } catch (IOException t) { promise.completeExceptionally(t); } }); - return promise; + } + return FutureUtil.apply(closeFuture.get(), promise); } @Override @@ -111,6 +124,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } CompletableFuture promise = new CompletableFuture<>(); executor.execute(() -> { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException()); + return; + } if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 5a571bb208e34..e36072df64e69 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -46,6 +48,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -66,13 +69,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { .newBuilder() .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS) .build(); + private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { Opened, Closed } - private State state = null; + private volatile State state = null; private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor) { @@ -97,7 +101,8 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - executor.execute(() -> { + if (closeFuture.compareAndSet(null, promise)) { + executor.execute(() -> { try { index.close(); inputStream.close(); @@ -108,7 +113,8 @@ public CompletableFuture closeAsync() { promise.completeExceptionally(t); } }); - return promise; + } + return FutureUtil.apply(closeFuture.get(), promise); } @Override @@ -298,6 +304,7 @@ public static ReadHandle open(ScheduledExecutorService executor, } // for testing + @VisibleForTesting State getState() { return this.state; } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index e40a0a3834c85..cf7a1de3d6aa4 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.val; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -47,6 +48,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -60,7 +62,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { private final List inputStreams; private final List dataStreams; private final ExecutorService executor; - private State state = null; + private volatile State state = null; + private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { Opened, @@ -124,21 +127,23 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - executor.execute(() -> { - try { - for (OffloadIndexBlockV2 indexBlock : indices) { - indexBlock.close(); - } - for (DataInputStream dataStream : dataStreams) { - dataStream.close(); + if (closeFuture.compareAndSet(null, promise)) { + executor.execute(() -> { + try { + for (OffloadIndexBlockV2 indexBlock : indices) { + indexBlock.close(); + } + for (DataInputStream dataStream : dataStreams) { + dataStream.close(); + } + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); } - state = State.Closed; - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); - return promise; + }); + } + return FutureUtil.apply(closeFuture.get(), promise); } @Override From c9f63564947224ce1f9d3149234d811b31c12950 Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 13:50:32 +0800 Subject: [PATCH 2/6] fix code --- .../impl/FileStoreBackedReadHandleImpl.java | 25 +++++++------ .../impl/BlobStoreBackedReadHandleImpl.java | 29 ++++++++------- .../impl/BlobStoreBackedReadHandleImplV2.java | 35 ++++++++++--------- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 17079b488adeb..141c3c6c53d9c 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -103,18 +103,21 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - if (closeFuture.compareAndSet(null, promise)) { - executor.execute(() -> { - try { - reader.close(); - state = State.Closed; - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); + + if (!closeFuture.compareAndSet(null, promise)) { + return FutureUtil.apply(closeFuture.get(), promise); } - return FutureUtil.apply(closeFuture.get(), promise); + + executor.execute(() -> { + try { + reader.close(); + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); + return promise; } @Override diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index e36072df64e69..ee7cd27f750fa 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -101,20 +101,23 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - if (closeFuture.compareAndSet(null, promise)) { - executor.execute(() -> { - try { - index.close(); - inputStream.close(); - entryOffsets.invalidateAll(); - state = State.Closed; - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); + + if (!closeFuture.compareAndSet(null, promise)) { + return FutureUtil.apply(closeFuture.get(), promise); } - return FutureUtil.apply(closeFuture.get(), promise); + + executor.execute(() -> { + try { + index.close(); + inputStream.close(); + entryOffsets.invalidateAll(); + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); + return promise; } @Override diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index cf7a1de3d6aa4..55f712477b8df 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -127,23 +127,26 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); - if (closeFuture.compareAndSet(null, promise)) { - executor.execute(() -> { - try { - for (OffloadIndexBlockV2 indexBlock : indices) { - indexBlock.close(); - } - for (DataInputStream dataStream : dataStreams) { - dataStream.close(); - } - state = State.Closed; - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); + + if (!closeFuture.compareAndSet(null, promise)) { + return FutureUtil.apply(closeFuture.get(), promise); } - return FutureUtil.apply(closeFuture.get(), promise); + + executor.execute(() -> { + try { + for (OffloadIndexBlockV2 indexBlock : indices) { + indexBlock.close(); + } + for (DataInputStream dataStream : dataStreams) { + dataStream.close(); + } + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); + return promise; } @Override From a2eed8db8032a7fb6ac78ad712a695d306f708df Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 15:01:23 +0800 Subject: [PATCH 3/6] address comment --- .../apache/pulsar/common/util/FutureUtil.java | 24 ++++++++++++------- .../impl/FileStoreBackedReadHandleImpl.java | 2 +- .../impl/BlobStoreBackedReadHandleImpl.java | 2 +- .../impl/BlobStoreBackedReadHandleImplV2.java | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 9149aa41627bb..e1a27f7594435 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -348,20 +348,28 @@ public static void safeRunAsync(Runnable runnable, }); } - public static @Nonnull CompletableFuture apply(@Nonnull CompletableFuture s, - @Nonnull CompletableFuture d) { - if (s == d) { - return d; + /** + * Apply source's result to target. + * + * @param source + * @param target + * @return + * @param + */ + public static @Nonnull CompletableFuture completeAfter(@Nonnull CompletableFuture source, + @Nonnull CompletableFuture target) { + if (source == target) { + return target; } - s.whenComplete((v, t) -> { + source.whenComplete((v, t) -> { if (t == null) { - d.complete(v); + target.complete(v); } else { - d.completeExceptionally(t); + target.completeExceptionally(t); } }); - return d; + return target; } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 141c3c6c53d9c..af3c4a6e3b2c3 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -105,7 +105,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.apply(closeFuture.get(), promise); + return FutureUtil.completeAfter(closeFuture.get(), promise); } executor.execute(() -> { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index ee7cd27f750fa..cb329c2009108 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -103,7 +103,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.apply(closeFuture.get(), promise); + return FutureUtil.completeAfter(closeFuture.get(), promise); } executor.execute(() -> { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 55f712477b8df..2da45b6ff4398 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -129,7 +129,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.apply(closeFuture.get(), promise); + return FutureUtil.completeAfter(closeFuture.get(), promise); } executor.execute(() -> { From e0d0bbe23486cddddfebb907d7fff10c4b418d41 Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 15:17:44 +0800 Subject: [PATCH 4/6] address comment --- .../apache/pulsar/common/util/FutureUtil.java | 25 ------------------- .../impl/FileStoreBackedReadHandleImpl.java | 3 +-- .../impl/BlobStoreBackedReadHandleImpl.java | 3 +-- .../impl/BlobStoreBackedReadHandleImplV2.java | 3 +-- 4 files changed, 3 insertions(+), 31 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index e1a27f7594435..6f62589853593 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -347,29 +347,4 @@ public static void safeRunAsync(Runnable runnable, return null; }); } - - /** - * Apply source's result to target. - * - * @param source - * @param target - * @return - * @param - */ - public static @Nonnull CompletableFuture completeAfter(@Nonnull CompletableFuture source, - @Nonnull CompletableFuture target) { - if (source == target) { - return target; - } - - source.whenComplete((v, t) -> { - if (t == null) { - target.complete(v); - } else { - target.completeExceptionally(t); - } - }); - - return target; - } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index af3c4a6e3b2c3..c41ee2f59f9ba 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -43,7 +43,6 @@ import org.apache.hadoop.io.MapFile; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +104,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.completeAfter(closeFuture.get(), promise); + return closeFuture.get(); } executor.execute(() -> { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index cb329c2009108..797014d4813df 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -48,7 +48,6 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -103,7 +102,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.completeAfter(closeFuture.get(), promise); + return closeFuture.get(); } executor.execute(() -> { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 2da45b6ff4398..ce0eddeededc4 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -48,7 +48,6 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -129,7 +128,7 @@ public CompletableFuture closeAsync() { CompletableFuture promise = new CompletableFuture<>(); if (!closeFuture.compareAndSet(null, promise)) { - return FutureUtil.completeAfter(closeFuture.get(), promise); + return closeFuture.get(); } executor.execute(() -> { From b286d0feb06556c1ea241366f53c616893428207 Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 15:42:24 +0800 Subject: [PATCH 5/6] address comment --- .../filesystem/impl/FileStoreBackedReadHandleImpl.java | 5 ++--- .../offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java | 5 ++--- .../jcloud/impl/BlobStoreBackedReadHandleImplV2.java | 6 +++--- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index c41ee2f59f9ba..91e7e902eab8a 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -101,12 +101,11 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); - - if (!closeFuture.compareAndSet(null, promise)) { + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { return closeFuture.get(); } + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { try { reader.close(); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 797014d4813df..5346be6a044c8 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -99,12 +99,11 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); - - if (!closeFuture.compareAndSet(null, promise)) { + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { return closeFuture.get(); } + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { try { index.close(); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index ce0eddeededc4..3112970e0a1fd 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -125,13 +125,13 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); - - if (!closeFuture.compareAndSet(null, promise)) { + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { return closeFuture.get(); } + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { + try { for (OffloadIndexBlockV2 indexBlock : indices) { indexBlock.close(); From bf332a3d2c1fe6c6aec3455dad10a9ea61259018 Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 19:42:18 +0800 Subject: [PATCH 6/6] address comment --- .../offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 3112970e0a1fd..53d96e08abf5e 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -131,7 +131,6 @@ public CompletableFuture closeAsync() { CompletableFuture promise = closeFuture.get(); executor.execute(() -> { - try { for (OffloadIndexBlockV2 indexBlock : indices) { indexBlock.close(); @@ -150,7 +149,9 @@ public CompletableFuture closeAsync() { @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { - log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); + if (log.isDebugEnabled()) { + log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); + } CompletableFuture promise = new CompletableFuture<>(); executor.execute(() -> { if (state == State.Closed) {