Skip to content

Commit 8ce67b9

Browse files
authored
[refactor][ml] Replace cache eviction algorithm with centralized removal queue and job (#24363)
1 parent cf3d7d1 commit 8ce67b9

34 files changed

+2315
-1450
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,13 @@ public interface Entry {
6666
* of data reached to 0).
6767
*/
6868
boolean release();
69+
70+
/**
71+
* Check if this entry is for the given Position.
72+
* @param position the position to check against
73+
* @return true if the entry matches the position, false otherwise
74+
*/
75+
default boolean matchesPosition(Position position) {
76+
return position != null && position.compareTo(getLedgerId(), getEntryId()) == 0;
77+
}
6978
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger;
2020

21+
import java.util.Objects;
2122
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
2223

2324
/**
@@ -48,12 +49,17 @@ public static Position create(long ledgerId, long entryId) {
4849
}
4950

5051
/**
51-
* Create a new position.
52+
* Create a new position or returns the other instance if it's immutable.
5253
*
5354
* @param other other position
5455
* @return new position
5556
*/
5657
public static Position create(Position other) {
58+
Objects.requireNonNull(other, "Position cannot be null");
59+
if (other instanceof ImmutablePositionImpl) {
60+
// Return the same instance if it's already an ImmutablePositionImpl
61+
return other;
62+
}
5763
return new ImmutablePositionImpl(other);
5864
}
5965
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java renamed to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,13 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.bookkeeper.mledger.impl.cache;
19+
package org.apache.bookkeeper.mledger;
2020

21-
import java.util.List;
21+
import io.netty.util.ReferenceCounted;
2222

2323
/**
24-
* Cache eviction policy abstraction interface.
25-
*
24+
* An Entry that is also reference counted.
2625
*/
27-
public interface EntryCacheEvictionPolicy {
28-
/**
29-
* Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
30-
*
31-
* @param caches
32-
* the list of caches to consider
33-
* @param sizeToFree
34-
* the minimum size in bytes to be freed
35-
*/
36-
void doEviction(List<EntryCache> caches, long sizeToFree);
26+
public interface ReferenceCountedEntry extends Entry, ReferenceCounted {
27+
3728
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@
2525
import io.netty.util.Recycler.Handle;
2626
import io.netty.util.ReferenceCounted;
2727
import org.apache.bookkeeper.client.api.LedgerEntry;
28+
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
2829
import org.apache.bookkeeper.mledger.Entry;
2930
import org.apache.bookkeeper.mledger.Position;
3031
import org.apache.bookkeeper.mledger.PositionFactory;
32+
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
33+
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
3134
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
32-
import org.apache.bookkeeper.mledger.util.RangeCache;
3335

34-
public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
35-
RangeCache.ValueWithKeyValidation<Position> {
36+
public final class EntryImpl extends AbstractCASReferenceCounted
37+
implements ReferenceCountedEntry, Comparable<EntryImpl> {
3638

3739
private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
3840
@Override
@@ -42,7 +44,6 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
4244
};
4345

4446
private final Handle<EntryImpl> recyclerHandle;
45-
private long timestamp;
4647
private long ledgerId;
4748
private long entryId;
4849
private Position position;
@@ -52,7 +53,6 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
5253

5354
public static EntryImpl create(LedgerEntry ledgerEntry) {
5455
EntryImpl entry = RECYCLER.get();
55-
entry.timestamp = System.nanoTime();
5656
entry.ledgerId = ledgerEntry.getLedgerId();
5757
entry.entryId = ledgerEntry.getEntryId();
5858
entry.data = ledgerEntry.getEntryBuffer();
@@ -61,10 +61,30 @@ public static EntryImpl create(LedgerEntry ledgerEntry) {
6161
return entry;
6262
}
6363

64+
public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
65+
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
66+
if (interceptor != null) {
67+
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
68+
processorHandle = interceptor
69+
.processPayloadBeforeEntryCache(duplicateBuffer);
70+
if (processorHandle != null) {
71+
ledgerEntry = LedgerEntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
72+
ledgerEntry.getLength(), processorHandle.getProcessedPayload());
73+
} else {
74+
duplicateBuffer.release();
75+
}
76+
}
77+
EntryImpl returnEntry = create(ledgerEntry);
78+
if (processorHandle != null) {
79+
processorHandle.release();
80+
ledgerEntry.close();
81+
}
82+
return returnEntry;
83+
}
84+
6485
@VisibleForTesting
6586
public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
6687
EntryImpl entry = RECYCLER.get();
67-
entry.timestamp = System.nanoTime();
6888
entry.ledgerId = ledgerId;
6989
entry.entryId = entryId;
7090
entry.data = Unpooled.wrappedBuffer(data);
@@ -74,7 +94,6 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
7494

7595
public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
7696
EntryImpl entry = RECYCLER.get();
77-
entry.timestamp = System.nanoTime();
7897
entry.ledgerId = ledgerId;
7998
entry.entryId = entryId;
8099
entry.data = data;
@@ -85,7 +104,7 @@ public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
85104

86105
public static EntryImpl create(Position position, ByteBuf data) {
87106
EntryImpl entry = RECYCLER.get();
88-
entry.timestamp = System.nanoTime();
107+
entry.position = PositionFactory.create(position);
89108
entry.ledgerId = position.getLedgerId();
90109
entry.entryId = position.getEntryId();
91110
entry.data = data;
@@ -94,16 +113,37 @@ public static EntryImpl create(Position position, ByteBuf data) {
94113
return entry;
95114
}
96115

116+
public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data) {
117+
EntryImpl entry = RECYCLER.get();
118+
entry.position = PositionFactory.create(position);
119+
entry.ledgerId = position.getLedgerId();
120+
entry.entryId = position.getEntryId();
121+
entry.data = data.retainedDuplicate();
122+
entry.setRefCnt(1);
123+
return entry;
124+
}
125+
97126
public static EntryImpl create(EntryImpl other) {
98127
EntryImpl entry = RECYCLER.get();
99-
entry.timestamp = System.nanoTime();
128+
// handle case where other.position is null due to lazy initialization
129+
entry.position = other.position != null ? PositionFactory.create(other.position) : null;
100130
entry.ledgerId = other.ledgerId;
101131
entry.entryId = other.entryId;
102132
entry.data = other.data.retainedDuplicate();
103133
entry.setRefCnt(1);
104134
return entry;
105135
}
106136

137+
public static EntryImpl create(Entry other) {
138+
EntryImpl entry = RECYCLER.get();
139+
entry.position = PositionFactory.create(other.getPosition());
140+
entry.ledgerId = other.getLedgerId();
141+
entry.entryId = other.getEntryId();
142+
entry.data = other.getDataBuffer().retainedDuplicate();
143+
entry.setRefCnt(1);
144+
return entry;
145+
}
146+
107147
private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
108148
this.recyclerHandle = recyclerHandle;
109149
}
@@ -124,10 +164,6 @@ public void onDeallocate(Runnable r) {
124164
}
125165
}
126166

127-
public long getTimestamp() {
128-
return timestamp;
129-
}
130-
131167
@Override
132168
public ByteBuf getDataBuffer() {
133169
return data;
@@ -201,15 +237,20 @@ protected void deallocate() {
201237
}
202238
data.release();
203239
data = null;
204-
timestamp = -1;
205240
ledgerId = -1;
206241
entryId = -1;
207242
position = null;
208243
recyclerHandle.recycle(this);
209244
}
210245

211246
@Override
212-
public boolean matchesKey(Position key) {
213-
return key.compareTo(ledgerId, entryId) == 0;
247+
public boolean matchesPosition(Position key) {
248+
return key != null && key.compareTo(ledgerId, entryId) == 0;
249+
}
250+
251+
@Override
252+
public String toString() {
253+
return getClass().getName() + "@" + System.identityHashCode(this)
254+
+ "{ledgerId=" + ledgerId + ", entryId=" + entryId + '}';
214255
}
215-
}
256+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
2222
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
2323
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.base.Predicates;
2526
import com.google.common.collect.BoundType;
2627
import com.google.common.collect.Maps;
@@ -118,6 +119,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
118119
private final ManagedLedgerFactoryConfig config;
119120
@Getter
120121
protected final OrderedScheduler scheduledExecutor;
122+
@Getter
121123
private final ScheduledExecutorService cacheEvictionExecutor;
122124

123125
@Getter
@@ -147,6 +149,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
147149
*/
148150
@Getter
149151
private boolean metadataServiceAvailable;
152+
private final ManagedLedgerConfig defaultManagedLedgerConfig;
150153

151154
private static class PendingInitializeManagedLedger {
152155

@@ -170,7 +173,8 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
170173
ManagedLedgerFactoryConfig config)
171174
throws Exception {
172175
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
173-
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
176+
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
177+
new ManagedLedgerConfig());
174178
}
175179

176180
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
@@ -181,15 +185,23 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper
181185
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
182186
ManagedLedgerFactoryConfig config)
183187
throws Exception {
184-
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config);
188+
this(metadataStore, bookKeeper, config, new ManagedLedgerConfig());
189+
}
190+
191+
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
192+
ManagedLedgerFactoryConfig config, ManagedLedgerConfig defaultManagedLedgerConfig)
193+
throws Exception {
194+
this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper),
195+
false /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(),
196+
defaultManagedLedgerConfig);
185197
}
186198

187199
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
188200
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
189201
ManagedLedgerFactoryConfig config)
190202
throws Exception {
191203
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
192-
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
204+
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop(), new ManagedLedgerConfig());
193205
}
194206

195207
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -198,15 +210,17 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
198210
OpenTelemetry openTelemetry)
199211
throws Exception {
200212
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
201-
config, statsLogger, openTelemetry);
213+
config, statsLogger, openTelemetry, new ManagedLedgerConfig());
202214
}
203215

204216
private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
205217
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
206218
boolean isBookkeeperManaged,
207219
ManagedLedgerFactoryConfig config,
208220
StatsLogger statsLogger,
209-
OpenTelemetry openTelemetry) throws Exception {
221+
OpenTelemetry openTelemetry,
222+
ManagedLedgerConfig defaultManagedLedgerConfig) throws Exception {
223+
this.defaultManagedLedgerConfig = defaultManagedLedgerConfig;
210224
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
211225
config.getCompressionConfigForManagedLedgerInfo();
212226
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -303,17 +317,60 @@ private synchronized void refreshStats() {
303317
lastStatTimestamp = now;
304318
}
305319

306-
private synchronized void doCacheEviction() {
320+
@VisibleForTesting
321+
public synchronized void doCacheEviction() {
307322
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
323+
entryCacheManager.doCacheEviction(maxTimestamp);
324+
}
308325

309-
ledgers.values().forEach(mlfuture -> {
310-
if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
311-
ManagedLedgerImpl ml = mlfuture.getNow(null);
312-
if (ml != null) {
313-
ml.doCacheEviction(maxTimestamp);
314-
}
326+
/**
327+
* Waits for all pending cache evictions based on total cache size or entry TTL to complete.
328+
* This is for testing purposes only, so that we can ensure all cache evictions are done before proceeding with
329+
* further operations. Please notice that Managed Ledger level cache eviction is not handled here. There's
330+
* a similar method {@link ManagedLedgerImpl#waitForPendingCacheEvictions()} that waits for pending cache evictions
331+
* at the Managed Ledger level.
332+
*/
333+
@VisibleForTesting
334+
public void waitForPendingCacheEvictions() {
335+
try {
336+
cacheEvictionExecutor.submit(() -> {
337+
// no-op
338+
}).get();
339+
} catch (InterruptedException e) {
340+
Thread.currentThread().interrupt();
341+
} catch (ExecutionException e) {
342+
throw new RuntimeException(e);
343+
}
344+
}
345+
346+
/**
347+
* Blocks the pending cache evictions until the returned runnable is called.
348+
* This is for testing purposes only, so that asynchronous cache evictions can be blocked for consistent
349+
* test results.
350+
*
351+
* @return
352+
* @throws InterruptedException
353+
*/
354+
@VisibleForTesting
355+
public Runnable blockPendingCacheEvictions() throws InterruptedException {
356+
CountDownLatch blockedLatch = new CountDownLatch(1);
357+
CountDownLatch releaseLatch = new CountDownLatch(1);
358+
cacheEvictionExecutor.execute(() -> {
359+
blockedLatch.countDown();
360+
try {
361+
releaseLatch.await();
362+
} catch (InterruptedException e) {
363+
Thread.currentThread().interrupt();
315364
}
316365
});
366+
blockedLatch.await();
367+
return () -> {
368+
if (releaseLatch.getCount() == 0) {
369+
throw new IllegalStateException("Releasing should only be called once");
370+
}
371+
releaseLatch.countDown();
372+
waitForPendingCacheEvictions();
373+
};
317374
}
318375

319376
/**
@@ -329,7 +386,7 @@ public Map<String, ManagedLedger> getManagedLedgers() {
329386

330387
@Override
331388
public ManagedLedger open(String name) throws InterruptedException, ManagedLedgerException {
332-
return open(name, new ManagedLedgerConfig());
389+
return open(name, defaultManagedLedgerConfig);
333390
}
334391

335392
@Override
@@ -365,7 +422,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
365422

366423
@Override
367424
public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {
368-
asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx);
425+
asyncOpen(name, defaultManagedLedgerConfig, callback, null, ctx);
369426
}
370427

371428
@Override

0 commit comments

Comments
 (0)