Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -3810,26 +3811,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
int maxEntriesBasedOnSize =
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
}

if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) {
Position posToRead = readPosition;
if (!ml.isValidPosition(readPosition)) {
posToRead = ml.getNextValidPosition(readPosition);
}
long result = 0;
long remainingBytesSize = bytesSize;

int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
while (remainingBytesSize > 0) {
// Last ledger.
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
// Only read 1 entry if no entries to read.
return 1;
}
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
result += remainingBytesSize / avg;
break;
}
// Skip empty ledger.
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
continue;
}
// Calculate entries by average of ledgers.
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
result += remainingBytesSize / avg;
break;
} else {
// Calculate for the next ledger.
result += remainEntriesOfLedger;
remainingBytesSize -= remainEntriesOfLedger * avg;
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
}
}

return Math.min(maxEntriesBasedOnSize, maxEntries);
return Math.max(result, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionFactory.LATEST);
@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
protected volatile long currentLedgerSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
originalCallback, ctx);
} else {
long estimatedEntrySize = getEstimatedEntrySize();
long estimatedEntrySize = getEstimatedEntrySize(lh);
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
if (log.isDebugEnabled()) {
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
Expand Down Expand Up @@ -419,12 +419,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio
}

@VisibleForTesting
public long getEstimatedEntrySize() {
long estimatedEntrySize = getAvgEntrySize();
if (estimatedEntrySize == 0) {
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
public long getEstimatedEntrySize(ReadHandle lh) {
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
// No entries stored.
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}

private long getAvgEntrySize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
Awaitility.await().untilAsserted(() -> {
long remainingBytes =limiter.getRemainingBytes();
long remainingBytes = limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
Expand All @@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
}).start();

long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
Expand All @@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
Expand All @@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception {

readCompleteSignal2.countDown();
cb2.entries.join();
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
Expand All @@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
}

private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
return entriesCount * perEntrySize;
}

class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -686,13 +687,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

for (int i = 0; i < 100; i++) {
ledger.addEntry(new byte[1024]);
ledger.addEntry(new byte[(int) (1024)]);
}

// First time, since we don't have info, we'll get 1 single entry
readAndCheck(cursor, 10, 3 * 1024, 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
readAndCheck(cursor, 10, 3 * avg, 3);
// We should only return 3 entries, based on the max size
readAndCheck(cursor, 20, 3 * 1024, 3);
readAndCheck(cursor, 20, 3 * avg, 3);
// If maxSize is < avg, we should get 1 entry
readAndCheck(cursor, 10, 500, 1);
}
Expand Down Expand Up @@ -3914,13 +3917,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ledger.addEntry(new byte[1024]);
}

// First time, since we don't have info, we'll get 1 single entry
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
assertEquals(entries.size(), 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

// We should only return 3 entries, based on the max size
entries = c.readEntriesOrWait(10, 3 * 1024);
entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

Expand Down Expand Up @@ -5164,6 +5169,83 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
assertEquals(positionRef4.get(), position4);
}

@Test
public void testEstimateEntryCountBySize() throws Exception {
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
long entryCount0 =
ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml);
assertEquals(entryCount0, 1);
// Avoid trimming ledgers.
ml.openCursor("c1");

// Build data.
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1});
}
long ledger1 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2});
}
long ledger2 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2, 3, 4});
}
long ledger3 = ml.getCurrentLedger().getId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);

// Test: the individual ledgers.
long entryCount1 =
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount1, 16);
long entryCount2 =
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml);
assertEquals(entryCount2, 8);
long entryCount3 =
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml);
assertEquals(entryCount3, 4);

// Test: across ledgers.
long entryCount4 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount4, 108);
long entryCount5 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml);
assertEquals(entryCount5, 104);
long entryCount6 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount6, 204);

long entryCount7 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml);
assertEquals(entryCount7, 28);
long entryCount8 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml);
assertEquals(entryCount8, 24);
long entryCount9 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml);
assertEquals(entryCount9, 124);

// Test: read more than entries written.
long entryCount10 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount10, 304);

// cleanup.
ml.delete();
}

@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testBatchMessageAck() {
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.receiverQueueSize(10)
.receiverQueueSize(50)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -114,27 +114,29 @@ public void testBatchMessageAck() {
consumer.acknowledge(receive1);
consumer.acknowledge(receive2);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
// broker will deliver much messages than before. So edit 18 -> 38 here.
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
});
Message<byte[]> receive3 = consumer.receive();
Message<byte[]> receive4 = consumer.receive();
consumer.acknowledge(receive3);
consumer.acknowledge(receive4);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
consumer.pause();
Message<byte[]> receive5 = consumer.receive();
consumer.negativeAcknowledge(receive5);
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
});
// Unblock cmd-flow.
consumer.resume();
consumer.receive();
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,13 @@ public void testAvgMessagesPerEntry() throws Exception {
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
.create();

producer.send("first-message");
// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
for (int i = 0; i < 20; i++) {
producer.send("first-message");
}
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
futures.add(producer.sendAsync("message"));
Expand Down Expand Up @@ -480,6 +485,7 @@ public void testAvgMessagesPerEntry() throws Exception {
metadataConsumer.put("matchValueReschedule", "producer2");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
.receiverQueueSize(20)
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

int counter = 0;
Expand All @@ -494,14 +500,17 @@ public void testAvgMessagesPerEntry() throws Exception {
}
}

assertEquals(21, counter);
assertEquals(40, counter);

ConsumerStats consumerStats =
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);

assertEquals(21, consumerStats.getMsgOutCounter());
assertEquals(40, consumerStats.getMsgOutCounter());

// Math.round(1 * 0.9 + 0.1 * (20 / 1))
// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}
Expand Down
Loading