Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1224,9 +1224,21 @@ managedLedgerCacheEvictionWatermark=0.9
# Configure the cache eviction interval in milliseconds for the managed ledger cache
managedLedgerCacheEvictionIntervalMs=10

# All entries that have stayed in cache for more than the configured time, will be evicted
# All entries that have stayed in cache for more than the configured time will be evicted.
# When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries
# that have reached their expected read count (i.e., entries that have been read by all
# anticipated consumers). Entries with a positive expected read count use
# managedLedgerCacheEvictionTimeThresholdMillisMax instead.
managedLedgerCacheEvictionTimeThresholdMillis=1000

# Maximum time-to-live in cache for entries that still have pending expected reads.
# This setting is only effective when cacheEvictionByExpectedReadCount is enabled.
# Entries with a positive expected read count (indicating they are anticipated to be
# read by additional consumers) will be retained in cache up to this longer threshold,
# helping avoid cache misses for scenarios like Key_Shared subscription replays,
# catch-up reads, and consumers that temporarily fall behind the tail.
managedLedgerCacheEvictionTimeThresholdMillisMax=5000

# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
# and thus should be set as inactive.
managedLedgerCursorBackloggedThreshold=1000
Expand Down
14 changes: 13 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,21 @@ managedLedgerCacheEvictionWatermark=0.9
# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0

# All entries that have stayed in cache for more than the configured time, will be evicted
# All entries that have stayed in cache for more than the configured time will be evicted.
# When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries
# that have reached their expected read count (i.e., entries that have been read by all
# anticipated consumers). Entries with a positive expected read count use
# managedLedgerCacheEvictionTimeThresholdMillisMax instead.
managedLedgerCacheEvictionTimeThresholdMillis=1000

# Maximum time-to-live in cache for entries that still have pending expected reads.
# This setting is only effective when cacheEvictionByExpectedReadCount is enabled.
# Entries with a positive expected read count (indicating they are anticipated to be
# read by additional consumers) will be retained in cache up to this longer threshold,
# helping avoid cache misses for scenarios like Key_Shared subscription replays,
# catch-up reads, and consumers that temporarily fall behind the tail.
managedLedgerCacheEvictionTimeThresholdMillisMax=5000

# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
# and thus should be set as inactive.
managedLedgerCursorBackloggedThreshold=1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public interface Entry {
*/
boolean release();

/**
* Managed Ledger implementations of EntryImpl should implement this method to return the read count handler
* associated with the entry.
* This handler is used to track how many times the entry has been read and to manage
* the eviction of entries from the broker cache based on their expected read count.
* @return
*/
default EntryReadCountHandler getReadCountHandler() {
return null;
}

/**
* Check if this entry is for the given Position.
* @param position the position to check against
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public interface EntryReadCountHandler {
int getExpectedReadCount();
void incrementExpectedReadCount();
void markRead();
default boolean hasExpectedReads() {
return getExpectedReadCount() >= 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
@Getter
@Setter
private boolean cacheEvictionByExpectedReadCount = true;
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,41 @@ void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFut
EntryCacheManager getEntryCacheManager();

/**
* update cache evictionTimeThreshold.
*
* @param cacheEvictionTimeThresholdNanos time threshold for eviction.
* update cache evictionTimeThreshold dynamically. Similar as
* {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillis(long)}
* but the value is in nanos. This inconsistency is kept for backwards compatibility.
* @param cacheEvictionTimeThresholdNanos time threshold in nanos for eviction.
*/
void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);

/**
* time threshold for eviction. Similar as
* {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillis(long)}
* but the value is in nanos.
* @return time threshold for eviction.
* */
*/
long getCacheEvictionTimeThreshold();

/**
* update cache evictionTimeThresholdMax. Similar as
* {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillisMax(long)}
* but the value is in nanos. This inconsistency is kept for consistency with
* {@link #updateCacheEvictionTimeThreshold(long)}.
* @param cacheEvictionTimeThresholdMaxNanos time threshold in nanos for eviction.
*/
default void updateCacheEvictionTimeThresholdMax(long cacheEvictionTimeThresholdMaxNanos) {
// Default implementation does nothing for backwards compatibility of the ManagedLedgerFactory interface.
// Subclasses can override this method to provide specific behavior.
}

/**
* max time threshold for eviction. Similar as
* {@link ManagedLedgerFactoryConfig#setCacheEvictionTimeThresholdMillisMax(long)}
* but the value is in nanos.
* @return max time threshold for eviction.
* */
long getCacheEvictionTimeThresholdMax();

/**
* @return properties of this managedLedger.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,24 @@ public class ManagedLedgerFactoryConfig {
private long cacheEvictionIntervalMs = 10;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted.
* All entries that have stayed in cache for more than the configured time will be evicted.
* When cacheEvictionByExpectedReadCount is enabled, this threshold applies only to entries
* that have reached their expected read count (i.e., entries that have been read by all
* anticipated consumers). Entries with a positive expected read count use
* managedLedgerCacheEvictionTimeThresholdMillisMax instead.
*/
private long cacheEvictionTimeThresholdMillis = 1000;

/**
* Maximum time-to-live in cache for entries that still have pending expected reads.
* This setting is only effective when cacheEvictionByExpectedReadCount is enabled.
* Entries with a positive expected read count (indicating they are anticipated to be
* read by additional consumers) will be retained in cache up to this longer threshold,
* helping avoid cache misses for scenarios like Key_Shared subscription replays,
* catch-up reads, and consumers that temporarily fall behind the tail.
*/
private long cacheEvictionTimeThresholdMillisMax = 5000;

/**
* Whether we should make a copy of the entry payloads when inserting in cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,13 @@
* An Entry that is also reference counted.
*/
public interface ReferenceCountedEntry extends Entry, ReferenceCounted {
EntryReadCountHandler getReadCountHandler();

default boolean hasExpectedReads() {
EntryReadCountHandler readCountHandler = getReadCountHandler();
if (readCountHandler != null) {
return readCountHandler.hasExpectedReads();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.EntryReadCountHandler;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
Expand All @@ -48,20 +49,24 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long entryId;
private Position position;
ByteBuf data;
private EntryReadCountHandler readCountHandler;
private boolean decreaseReadCountOnRelease = true;

private Runnable onDeallocate;

public static EntryImpl create(LedgerEntry ledgerEntry) {
public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerEntry.getLedgerId();
entry.entryId = ledgerEntry.getEntryId();
entry.data = ledgerEntry.getEntryBuffer();
entry.data.retain();
entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor) {
public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor interceptor,
int expectedReadCount) {
ManagedLedgerInterceptor.PayloadProcessorHandle processorHandle = null;
if (interceptor != null) {
ByteBuf duplicateBuffer = ledgerEntry.getEntryBuffer().retainedDuplicate();
Expand All @@ -74,7 +79,7 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor
duplicateBuffer.release();
}
}
EntryImpl returnEntry = create(ledgerEntry);
EntryImpl returnEntry = create(ledgerEntry, expectedReadCount);
if (processorHandle != null) {
processorHandle.release();
ledgerEntry.close();
Expand All @@ -84,41 +89,66 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor

@VisibleForTesting
public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return create(ledgerId, entryId, data, 0);
}

@VisibleForTesting
public static EntryImpl create(long ledgerId, long entryId, byte[] data, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = Unpooled.wrappedBuffer(data);
entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
return create(ledgerId, entryId, data, 0);
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = data;
entry.data.retain();
entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(Position position, ByteBuf data) {
public static EntryImpl create(Position position, ByteBuf data, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data;
entry.data.retain();
entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data.retainedDuplicate();
entry.readCountHandler = EntryReadCountHandlerImpl.maybeCreate(expectedReadCount);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data) {
public static EntryImpl createWithRetainedDuplicate(Position position, ByteBuf data,
EntryReadCountHandler entryReadCountHandler) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data.retainedDuplicate();
entry.readCountHandler = entryReadCountHandler;
entry.setRefCnt(1);
return entry;
}
Expand All @@ -130,6 +160,7 @@ public static EntryImpl create(EntryImpl other) {
entry.ledgerId = other.ledgerId;
entry.entryId = other.entryId;
entry.data = other.data.retainedDuplicate();
entry.readCountHandler = other.readCountHandler;
entry.setRefCnt(1);
return entry;
}
Expand All @@ -140,6 +171,7 @@ public static EntryImpl create(Entry other) {
entry.ledgerId = other.getLedgerId();
entry.entryId = other.getEntryId();
entry.data = other.getDataBuffer().retainedDuplicate();
entry.readCountHandler = other.getReadCountHandler();
entry.setRefCnt(1);
return entry;
}
Expand Down Expand Up @@ -227,6 +259,9 @@ public ReferenceCounted touch(Object hint) {

@Override
protected void deallocate() {
if (decreaseReadCountOnRelease && readCountHandler != null) {
readCountHandler.markRead();
}
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
if (onDeallocate != null) {
try {
Expand All @@ -240,6 +275,8 @@ protected void deallocate() {
ledgerId = -1;
entryId = -1;
position = null;
readCountHandler = null;
decreaseReadCountOnRelease = true;
recyclerHandle.recycle(this);
}

Expand All @@ -248,6 +285,15 @@ public boolean matchesPosition(Position key) {
return key != null && key.compareTo(ledgerId, entryId) == 0;
}

@Override
public EntryReadCountHandler getReadCountHandler() {
return readCountHandler;
}

public void setDecreaseReadCountOnRelease(boolean enabled) {
decreaseReadCountOnRelease = enabled;
}

@Override
public String toString() {
return getClass().getName() + "@" + System.identityHashCode(this)
Expand Down
Loading
Loading