Skip to content

Commit 5cc299d

Browse files
lhotaribahetimansi
authored andcommitted
[fix][ml] Fix issues in estimateEntryCountBySize (apache#24089)
(cherry picked from commit a44b2cf) (cherry picked from commit 50754a6)
1 parent 53f93fe commit 5cc299d

File tree

6 files changed

+501
-72
lines changed

6 files changed

+501
-72
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
22+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.DEFAULT_ESTIMATED_ENTRY_SIZE;
23+
import java.util.Collection;
24+
import java.util.Map;
25+
import java.util.NavigableMap;
26+
import org.apache.bookkeeper.client.LedgerHandle;
27+
import org.apache.bookkeeper.mledger.Position;
28+
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
29+
30+
class EntryCountEstimator {
31+
// Prevent instantiation, this is a utility class with only static methods
32+
private EntryCountEstimator() {
33+
}
34+
35+
/**
36+
* Estimates the number of entries that can be read within the specified byte size starting from the given position
37+
* in the ledger.
38+
*
39+
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
40+
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
41+
* @param readPosition the position in the ledger from where to start reading
42+
* @param ml the {@link ManagedLedgerImpl} instance to use for accessing ledger information
43+
* @return the estimated number of entries that can be read
44+
*/
45+
static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
46+
ManagedLedgerImpl ml) {
47+
LedgerHandle currentLedger = ml.getCurrentLedger();
48+
// currentLedger is null in ReadOnlyManagedLedgerImpl
49+
Long lastLedgerId = currentLedger != null ? currentLedger.getId() : null;
50+
long lastLedgerTotalSize = ml.getCurrentLedgerSize();
51+
long lastLedgerTotalEntries = ml.getCurrentLedgerEntries();
52+
return internalEstimateEntryCountByBytesSize(maxEntries, maxSizeBytes, readPosition, ml.getLedgersInfo(),
53+
lastLedgerId, lastLedgerTotalEntries, lastLedgerTotalSize);
54+
}
55+
56+
/**
57+
* Internal method to estimate the number of entries that can be read within the specified byte size.
58+
* This method is used for unit testing to validate the logic without directly accessing {@link ManagedLedgerImpl}.
59+
*
60+
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
61+
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
62+
* @param readPosition the position in the ledger from where to start reading
63+
* @param ledgersInfo a map of ledger ID to {@link MLDataFormats.ManagedLedgerInfo.LedgerInfo} containing
64+
* metadata for ledgers
65+
* @param lastLedgerId the ID of the last active ledger in the managed ledger
66+
* @param lastLedgerTotalEntries the total number of entries in the last active ledger
67+
* @param lastLedgerTotalSize the total size in bytes of the last active ledger
68+
* @return the estimated number of entries that can be read
69+
*/
70+
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
71+
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
72+
ledgersInfo,
73+
Long lastLedgerId, long lastLedgerTotalEntries,
74+
long lastLedgerTotalSize) {
75+
if (maxSizeBytes <= 0) {
76+
// If the specified maximum size is invalid (e.g., non-positive), return 0
77+
return 0;
78+
}
79+
80+
// If the maximum size is Long.MAX_VALUE, return the maximum number of entries
81+
if (maxSizeBytes == Long.MAX_VALUE) {
82+
return maxEntries;
83+
}
84+
85+
// Adjust the read position to ensure it falls within the valid range of available ledgers.
86+
// This handles special cases such as EARLIEST and LATEST positions by resetting them
87+
// to the first available ledger or the last active ledger, respectively.
88+
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
89+
readPosition = PositionImpl.get(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
90+
} else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
91+
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
92+
readPosition =
93+
PositionImpl.get(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
94+
} else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
95+
readPosition = PositionImpl.get(ledgersInfo.firstKey(), 0);
96+
}
97+
98+
long estimatedEntryCount = 0;
99+
long remainingBytesSize = maxSizeBytes;
100+
// Start with a default estimated average size per entry, including any overhead
101+
long currentAvgSize = DEFAULT_ESTIMATED_ENTRY_SIZE + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
102+
// Get a collection of ledger info starting from the read position
103+
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
104+
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
105+
106+
// calculate the estimated entry count based on the remaining bytes and ledger metadata
107+
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
108+
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
109+
// Stop processing if there are no more bytes remaining to allocate for entries
110+
// or if the estimated entry count exceeds the maximum allowed entries
111+
break;
112+
}
113+
long ledgerId = ledgerInfo.getLedgerId();
114+
long ledgerTotalSize = ledgerInfo.getSize();
115+
long ledgerTotalEntries = ledgerInfo.getEntries();
116+
117+
// Adjust ledger size and total entry count if this is the last active ledger since the
118+
// ledger metadata doesn't include the current ledger's size and entry count
119+
// the lastLedgerId is null in ReadOnlyManagedLedgerImpl
120+
if (lastLedgerId != null && ledgerId == lastLedgerId.longValue()
121+
&& lastLedgerTotalSize > 0 && lastLedgerTotalEntries > 0) {
122+
ledgerTotalSize = lastLedgerTotalSize;
123+
ledgerTotalEntries = lastLedgerTotalEntries;
124+
}
125+
126+
// Skip processing ledgers that have no entries or size
127+
if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
128+
continue;
129+
}
130+
131+
// Update the average entry size based on the current ledger's size and entry count
132+
currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
133+
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
134+
135+
// Calculate the total size of this ledger, inclusive of bookkeeping overhead per entry
136+
long ledgerTotalSizeWithBkOverhead =
137+
ledgerTotalSize + ledgerTotalEntries * BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
138+
139+
// If the remaining bytes are insufficient to read the full ledger, estimate the readable entries
140+
// or when the read position is beyond the first entry in the ledger
141+
if (remainingBytesSize < ledgerTotalSizeWithBkOverhead
142+
|| readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
143+
long entryCount;
144+
if (readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
145+
entryCount = Math.max(ledgerTotalEntries - readPosition.getEntryId(), 1);
146+
} else {
147+
entryCount = ledgerTotalEntries;
148+
}
149+
// Estimate how many entries can fit within the remaining bytes
150+
long entriesToRead = Math.min(Math.max(1, remainingBytesSize / currentAvgSize), entryCount);
151+
estimatedEntryCount += entriesToRead;
152+
remainingBytesSize -= entriesToRead * currentAvgSize;
153+
} else {
154+
// If the full ledger can be read, add all its entries to the count and reduce its size
155+
estimatedEntryCount += ledgerTotalEntries;
156+
remainingBytesSize -= ledgerTotalSizeWithBkOverhead;
157+
}
158+
}
159+
160+
// Add any remaining bytes to the estimated entry count considering the current average entry size
161+
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
162+
estimatedEntryCount += remainingBytesSize / currentAvgSize;
163+
}
164+
165+
// Ensure at least one entry is always returned as the result
166+
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
167+
}
168+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.util.Objects.requireNonNull;
2323
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
24+
import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
2425
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
2526
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
2627
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
27-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2828
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
2929
import com.google.common.annotations.VisibleForTesting;
3030
import com.google.common.base.MoreObjects;
@@ -3711,53 +3711,8 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
37113711
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
37123712
return maxEntries;
37133713
}
3714-
long estimatedEntryCount = estimateEntryCountBySize(maxSizeBytes, readPosition, ledger);
3715-
if (estimatedEntryCount > Integer.MAX_VALUE) {
3716-
return maxEntries;
3717-
}
3718-
return Math.min((int) estimatedEntryCount, maxEntries);
3719-
}
3720-
3721-
static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) {
3722-
Position posToRead = readPosition;
3723-
if (!ml.isValidPosition(readPosition)) {
3724-
posToRead = ml.getNextValidPosition(readPosition);
3725-
}
3726-
long result = 0;
3727-
long remainingBytesSize = bytesSize;
3728-
3729-
while (remainingBytesSize > 0) {
3730-
// Last ledger.
3731-
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
3732-
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
3733-
// Only read 1 entry if no entries to read.
3734-
return 1;
3735-
}
3736-
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
3737-
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3738-
result += remainingBytesSize / avg;
3739-
break;
3740-
}
3741-
// Skip empty ledger.
3742-
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
3743-
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
3744-
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3745-
continue;
3746-
}
3747-
// Calculate entries by average of ledgers.
3748-
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3749-
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
3750-
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
3751-
result += remainingBytesSize / avg;
3752-
break;
3753-
} else {
3754-
// Calculate for the next ledger.
3755-
result += remainEntriesOfLedger;
3756-
remainingBytesSize -= remainEntriesOfLedger * avg;
3757-
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3758-
}
3759-
}
3760-
return Math.max(result, 1);
3714+
int estimatedEntryCount = estimateEntryCountByBytesSize(maxEntries, maxSizeBytes, readPosition, ledger);
3715+
return Math.min(estimatedEntryCount, maxEntries);
37613716
}
37623717

37633718
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class RangeEntryCacheImpl implements EntryCache {
5858
* Overhead per-entry to take into account the envelope.
5959
*/
6060
public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
61-
private static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
61+
public static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024;
6262
private static final boolean DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY = false;
6363

6464
private final RangeEntryCacheManagerImpl manager;

0 commit comments

Comments
 (0)