diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 478c6a1b37976..022cecf8d57b5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2890,6 +2890,54 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) { }, null); } + /** + * Manually acknowledge all entries from startPosition to endPosition. + * - Since this is an uncommon event, we focus on maintainability. So we do not modify + * {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call + * {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}. + * - This method is valid regardless of the consumer ACK type. + * - If there is a consumer ack request after this event, it will also work. + */ + public void skipNonRecoverableEntries(Position startPosition, Position endPosition){ + long ledgerId = startPosition.getLedgerId(); + LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId); + if (ledgerInfo == null) { + return; + } + + long startEntryId = Math.max(0, startPosition.getEntryId()); + long endEntryId = ledgerId != endPosition.getLedgerId() ? ledgerInfo.getEntries() : endPosition.getEntryId(); + if (startEntryId >= endEntryId) { + return; + } + + lock.writeLock().lock(); + log.warn("[{}] [{}] Since these entry for ledger [{}] is lost and the autoSkipNonRecoverableData is true, " + + "these entries [{}:{}) will be auto acknowledge in subscription", + ledger.getName(), name, ledgerId, startEntryId, endEntryId); + try { + for (long i = startEntryId; i < endEntryId; i++) { + if (!individualDeletedMessages.contains(ledgerId, i)) { + asyncDelete(PositionFactory.create(ledgerId, i), new AsyncCallbacks.DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + // ignore. + } + + @Override + public void deleteFailed(ManagedLedgerException ex, Object ctx) { + // The method internalMarkDelete already handled the failure operation. We only need to + // make sure the memory state is updated. + // If the broker crashed, the non-recoverable ledger will be detected again. + } + }, null); + } + } + } finally { + lock.writeLock().unlock(); + } + } + // ////////////////////////////////////////////////// void startCreatingNewMetadataLedger() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 3fd7e36c433ae..a4928b44bd97d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -137,6 +137,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { updateReadPosition(nexReadPosition); if (lostLedger != null) { cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger); + } else { + cursor.skipNonRecoverableEntries(readPosition, nexReadPosition); } checkReadCompletion(); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index b4ab673facb26..69b74fcf8f5c1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4766,6 +4766,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } + @Test + public void testSkipNonRecoverableEntries() throws ManagedLedgerException, InterruptedException { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + int maxMessagePerLedger = 10; + managedLedgerConfig.setMaxEntriesPerLedger(maxMessagePerLedger); + ManagedLedger ledger = factory.open("testSkipNonRecoverableEntries", managedLedgerConfig); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my-cursor"); + + Position lacPosition = ledger.getLastConfirmedEntry(); + long ledgerId = lacPosition.getLedgerId(); + assertEquals(PositionFactory.create(ledgerId, -1), cursor.getMarkDeletedPosition()); + + // Mock add 10 entry + for (int i = 0; i < 10; i++) { + ledger.addEntry(String.valueOf(i).getBytes()); + } + + // read 2 entry and delete these entries, MarkDeletedPosition move forward + List entries = cursor.readEntries(2); + for (Entry entry : entries) { + cursor.delete(entry.getPosition()); + } + assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition()); + + // read the next 6 entry and not delete, MarkDeletedPosition not move forward + entries = cursor.readEntries(6); + assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition()); + + // delete last read entry, MarkDeletedPosition not move forward + Entry lastEntry = entries.get(entries.size() - 1); + cursor.delete(lastEntry.getPosition()); + assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition()); + + // call skip entries, MarkDeletedPosition move forward + cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(), + PositionFactory.create(ledgerId, lastEntry.getEntryId())); + assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition()); + + // repeat call skip entries, MarkDeletedPosition not change + cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(), + PositionFactory.create(ledgerId, lastEntry.getEntryId())); + assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition()); + + cursor.close(); + ledger.close(); + } + @Test public void testRecoverCursorWithTerminateManagedLedger() throws Exception { String mlName = "my_test_ledger";