|
22 | 22 | import java.util.Collection; |
23 | 23 | import java.util.Map; |
24 | 24 | import java.util.NavigableMap; |
| 25 | +import java.util.NoSuchElementException; |
25 | 26 | import org.apache.bookkeeper.client.LedgerHandle; |
26 | 27 | import org.apache.bookkeeper.mledger.Position; |
27 | 28 | import org.apache.bookkeeper.mledger.PositionFactory; |
@@ -82,17 +83,15 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt |
82 | 83 | return maxEntries; |
83 | 84 | } |
84 | 85 |
|
85 | | - // Adjust the read position to ensure it falls within the valid range of available ledgers. |
86 | | - // This handles special cases such as EARLIEST and LATEST positions by resetting them |
87 | | - // to the first available ledger or the last active ledger, respectively. |
88 | | - if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) { |
89 | | - readPosition = PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0)); |
90 | | - } else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) { |
91 | | - Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry(); |
92 | | - readPosition = |
93 | | - PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0)); |
94 | | - } else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) { |
95 | | - readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0); |
| 86 | + if (ledgersInfo.isEmpty()) { |
| 87 | + return 1; |
| 88 | + } |
| 89 | + |
| 90 | + try { |
| 91 | + readPosition = adjustReadPosition(readPosition, ledgersInfo, lastLedgerId, lastLedgerTotalEntries); |
| 92 | + } catch (NoSuchElementException e) { |
| 93 | + // there was a race condition where ledgersInfo became empty just before adjustReadPosition was called |
| 94 | + return 1; |
96 | 95 | } |
97 | 96 |
|
98 | 97 | long estimatedEntryCount = 0; |
@@ -183,4 +182,28 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt |
183 | 182 | // Ensure at least one entry is always returned as the result |
184 | 183 | return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1); |
185 | 184 | } |
| 185 | + |
| 186 | + private static Position adjustReadPosition(Position readPosition, |
| 187 | + NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> |
| 188 | + ledgersInfo, |
| 189 | + Long lastLedgerId, long lastLedgerTotalEntries) { |
| 190 | + // Adjust the read position to ensure it falls within the valid range of available ledgers. |
| 191 | + // This handles special cases such as EARLIEST and LATEST positions by resetting them |
| 192 | + // to the first available ledger or the last active ledger, respectively. |
| 193 | + if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) { |
| 194 | + return PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0)); |
| 195 | + } |
| 196 | + long lastKey = ledgersInfo.lastKey(); |
| 197 | + if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) { |
| 198 | + Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry(); |
| 199 | + if (lastEntry != null && lastEntry.getKey() == lastKey) { |
| 200 | + return PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0)); |
| 201 | + } |
| 202 | + } |
| 203 | + long firstKey = ledgersInfo.firstKey(); |
| 204 | + if (readPosition.getLedgerId() < firstKey) { |
| 205 | + return PositionFactory.create(firstKey, 0); |
| 206 | + } |
| 207 | + return readPosition; |
| 208 | + } |
186 | 209 | } |
0 commit comments