diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 203d48933f0a5..32d46ff1c3c40 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -3810,26 +3811,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } + int maxEntriesBasedOnSize = + Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue(); + return Math.min(maxEntriesBasedOnSize, maxEntries); + } - double avgEntrySize = ledger.getStats().getEntrySizeAverage(); - if (!Double.isFinite(avgEntrySize)) { - // We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats - avgEntrySize = (double) entriesReadSize / (double) entriesReadCount; - } - - if (!Double.isFinite(avgEntrySize)) { - // If we still don't have any information, it means this is the first time we attempt reading - // and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats - return 1; + static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) { + Position posToRead = readPosition; + if (!ml.isValidPosition(readPosition)) { + posToRead = ml.getNextValidPosition(readPosition); } + long result = 0; + long remainingBytesSize = bytesSize; - int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize); - if (maxEntriesBasedOnSize < 1) { - // We need to read at least one entry - return 1; + while (remainingBytesSize > 0) { + // Last ledger. + if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) { + if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) { + // Only read 1 entry if no entries to read. + return 1; + } + long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries()) + + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + result += remainingBytesSize / avg; + break; + } + // Skip empty ledger. + LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId()); + if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) { + posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE)); + continue; + } + // Calculate entries by average of ledgers. + long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId(); + if (remainEntriesOfLedger * avg >= remainingBytesSize) { + result += remainingBytesSize / avg; + break; + } else { + // Calculate for the next ledger. + result += remainEntriesOfLedger; + remainingBytesSize -= remainEntriesOfLedger * avg; + posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE)); + } } - - return Math.min(maxEntriesBasedOnSize, maxEntries); + return Math.max(result, 1); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7426059e576f6..c890ba01f634e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -224,6 +224,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); public static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionFactory.LATEST); + @VisibleForTesting + @Getter protected volatile LedgerHandle currentLedger; protected volatile long currentLedgerEntries = 0; protected volatile long currentLedgerSize = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index b81015ea63988..9a2de9ba8c41d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -303,7 +303,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx); } else { - long estimatedEntrySize = getEstimatedEntrySize(); + long estimatedEntrySize = getEstimatedEntrySize(lh); long estimatedReadSize = numberOfEntries * estimatedEntrySize; if (log.isDebugEnabled()) { log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size", @@ -419,12 +419,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio } @VisibleForTesting - public long getEstimatedEntrySize() { - long estimatedEntrySize = getAvgEntrySize(); - if (estimatedEntrySize == 0) { - estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE; + public long getEstimatedEntrySize(ReadHandle lh) { + if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) { + // No entries stored. + return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } - return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } private long getAvgEntrySize() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java index 48f0cf08ddff4..6676baf8b555a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception { SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx); cb0.entries.join(); - Long sizePerEntry1 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue(); Awaitility.await().untilAsserted(() -> { - long remainingBytes =limiter.getRemainingBytes(); + long remainingBytes = limiter.getRemainingBytes(); Assert.assertEquals(remainingBytes, totalCapacity); }); log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); @@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx); }).start(); - long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1); + long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry); long remainingBytesExpected1 = totalCapacity - bytesAcquired1; log.info("acquired : {}", bytesAcquired1); log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1); @@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { Thread.sleep(3000); readCompleteSignal1.countDown(); cb1.entries.join(); - Long sizePerEntry2 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); - long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1); + long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry); long remainingBytesExpected2 = totalCapacity - bytesAcquired2; log.info("acquired : {}", bytesAcquired2); log.info("remainingBytesExpected 1: {}", remainingBytesExpected2); @@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception { readCompleteSignal2.countDown(); cb2.entries.join(); - Long sizePerEntry3 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); Awaitility.await().untilAsserted(() -> { long remainingBytes = limiter.getRemainingBytes(); log.info("remainingBytes 2: {}", remainingBytes); @@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception { } private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) { - return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + return entriesCount * perEntrySize; } class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d3ea98131ad8f..1cb09d995393c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -686,13 +687,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception { ManagedCursor cursor = ledger.openCursor("c1"); for (int i = 0; i < 100; i++) { - ledger.addEntry(new byte[1024]); + ledger.addEntry(new byte[(int) (1024)]); } - // First time, since we don't have info, we'll get 1 single entry - readAndCheck(cursor, 10, 3 * 1024, 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024); + readAndCheck(cursor, 10, 3 * avg, 3); // We should only return 3 entries, based on the max size - readAndCheck(cursor, 20, 3 * 1024, 3); + readAndCheck(cursor, 20, 3 * avg, 3); // If maxSize is < avg, we should get 1 entry readAndCheck(cursor, 10, 500, 1); } @@ -3914,13 +3917,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception { ledger.addEntry(new byte[1024]); } - // First time, since we don't have info, we'll get 1 single entry - List entries = c.readEntriesOrWait(10, 3 * 1024); - assertEquals(entries.size(), 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + List entries = c.readEntriesOrWait(10, 3 * avg); + assertEquals(entries.size(), 3); entries.forEach(Entry::release); // We should only return 3 entries, based on the max size - entries = c.readEntriesOrWait(10, 3 * 1024); + entries = c.readEntriesOrWait(10, 3 * avg); assertEquals(entries.size(), 3); entries.forEach(Entry::release); @@ -5164,6 +5169,83 @@ public void findEntryFailed(ManagedLedgerException exception, Optional assertEquals(positionRef4.get(), position4); } + @Test + public void testEstimateEntryCountBySize() throws Exception { + final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", ""); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); + long entryCount0 = + ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml); + assertEquals(entryCount0, 1); + // Avoid trimming ledgers. + ml.openCursor("c1"); + + // Build data. + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1}); + } + long ledger1 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2}); + } + long ledger2 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2, 3, 4}); + } + long ledger3 = ml.getCurrentLedger().getId(); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2); + long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + + // Test: the individual ledgers. + long entryCount1 = + ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml); + assertEquals(entryCount1, 16); + long entryCount2 = + ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml); + assertEquals(entryCount2, 8); + long entryCount3 = + ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml); + assertEquals(entryCount3, 4); + + // Test: across ledgers. + long entryCount4 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml); + assertEquals(entryCount4, 108); + long entryCount5 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml); + assertEquals(entryCount5, 104); + long entryCount6 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml); + assertEquals(entryCount6, 204); + + long entryCount7 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml); + assertEquals(entryCount7, 28); + long entryCount8 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml); + assertEquals(entryCount8, 24); + long entryCount9 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml); + assertEquals(entryCount9, 124); + + // Test: read more than entries written. + long entryCount10 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml); + assertEquals(entryCount10, 304); + + // cleanup. + ml.delete(); + } + @Test void testForceCursorRecovery() throws Exception { TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 7fa7bf078e0c5..f21ac130e3cfd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -85,7 +85,7 @@ public void testBatchMessageAck() { .newConsumer() .topic(topicName) .subscriptionName(subscriptionName) - .receiverQueueSize(10) + .receiverQueueSize(50) .subscriptionType(SubscriptionType.Shared) .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -114,27 +114,29 @@ public void testBatchMessageAck() { consumer.acknowledge(receive1); consumer.acknowledge(receive2); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18); + // Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size, + // broker will deliver much messages than before. So edit 18 -> 38 here. + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38); }); Message receive3 = consumer.receive(); Message receive4 = consumer.receive(); consumer.acknowledge(receive3); consumer.acknowledge(receive4); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); // Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436. consumer.pause(); Message receive5 = consumer.receive(); consumer.negativeAcknowledge(receive5); Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20); }); // Unblock cmd-flow. consumer.resume(); consumer.receive(); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 59a911500e5d9..fc650127f90a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -445,8 +445,13 @@ public void testAvgMessagesPerEntry() throws Exception { .batchingMaxPublishDelay(5, TimeUnit.SECONDS) .batchingMaxBytes(Integer.MAX_VALUE) .create(); - - producer.send("first-message"); + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + for (int i = 0; i < 20; i++) { + producer.send("first-message"); + } + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". List> futures = new ArrayList<>(); for (int i = 0; i < 20; i++) { futures.add(producer.sendAsync("message")); @@ -480,6 +485,7 @@ public void testAvgMessagesPerEntry() throws Exception { metadataConsumer.put("matchValueReschedule", "producer2"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer) + .receiverQueueSize(20) .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); int counter = 0; @@ -494,14 +500,17 @@ public void testAvgMessagesPerEntry() throws Exception { } } - assertEquals(21, counter); + assertEquals(40, counter); ConsumerStats consumerStats = admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0); - assertEquals(21, consumerStats.getMsgOutCounter()); + assertEquals(40, consumerStats.getMsgOutCounter()); - // Math.round(1 * 0.9 + 0.1 * (20 / 1)) + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry(); assertEquals(3, avgMessagesPerEntry); }