Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,54 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) {
}
}

/**
* Manually acknowledge all entries from startPosition to endPosition.
* - Since this is an uncommon event, we focus on maintainability. So we do not modify
* {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
* {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
* - This method is valid regardless of the consumer ACK type.
* - If there is a consumer ack request after this event, it will also work.
*/
public void skipNonRecoverableEntries(Position startPosition, Position endPosition){
long ledgerId = startPosition.getLedgerId();
LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
if (ledgerInfo == null) {
return;
}

long startEntryId = Math.max(0, startPosition.getEntryId());
long endEntryId = ledgerId != endPosition.getLedgerId() ? ledgerInfo.getEntries() : endPosition.getEntryId();
if (startEntryId >= endEntryId) {
return;
}

lock.writeLock().lock();
log.warn("[{}] [{}] Since these entry for ledger [{}] is lost and the autoSkipNonRecoverableData is true, "
+ "these entries [{}:{}) will be auto acknowledge in subscription",
ledger.getName(), name, ledgerId, startEntryId, endEntryId);
try {
for (long i = startEntryId; i < endEntryId; i++) {
if (!individualDeletedMessages.contains(ledgerId, i)) {
asyncDelete(PositionImpl.get(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
// ignore.
}

@Override
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
// The method internalMarkDelete already handled the failure operation. We only need to
// make sure the memory state is updated.
// If the broker crashed, the non-recoverable ledger will be detected again.
}
}, null);
}
}
} finally {
lock.writeLock().unlock();
}
}

// //////////////////////////////////////////////////

void startCreatingNewMetadataLedger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
updateReadPosition(nexReadPosition);
if (lostLedger != null) {
cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
} else {
cursor.skipNonRecoverableEntries(readPosition, nexReadPosition);
}
checkReadCompletion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4484,5 +4484,52 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
public void testSkipNonRecoverableEntries() throws ManagedLedgerException, InterruptedException {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
int maxMessagePerLedger = 10;
managedLedgerConfig.setMaxEntriesPerLedger(maxMessagePerLedger);
ManagedLedger ledger = factory.open("testSkipNonRecoverableEntries", managedLedgerConfig);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my-cursor");

Position lacPosition = ledger.getLastConfirmedEntry();
long ledgerId = lacPosition.getLedgerId();
assertEquals(PositionImpl.get(ledgerId, -1), cursor.getMarkDeletedPosition());

// Mock add 10 entry
for (int i = 0; i < 10; i++) {
ledger.addEntry(String.valueOf(i).getBytes());
}

// read 2 entry and delete these entries, MarkDeletedPosition move forward
List<Entry> entries = cursor.readEntries(2);
for (Entry entry : entries) {
cursor.delete(entry.getPosition());
}
assertEquals(PositionImpl.get(ledgerId, 1), cursor.getMarkDeletedPosition());

// read the next 6 entry and not delete, MarkDeletedPosition not move forward
entries = cursor.readEntries(6);
assertEquals(PositionImpl.get(ledgerId, 1), cursor.getMarkDeletedPosition());

// delete last read entry, MarkDeletedPosition not move forward
Entry lastEntry = entries.get(entries.size() - 1);
cursor.delete(lastEntry.getPosition());
assertEquals(PositionImpl.get(ledgerId, 1), cursor.getMarkDeletedPosition());

// call skip entries, MarkDeletedPosition move forward
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
PositionImpl.get(ledgerId, lastEntry.getEntryId()));
assertEquals(PositionImpl.get(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());

// repeat call skip entries, MarkDeletedPosition not change
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
PositionImpl.get(ledgerId, lastEntry.getEntryId()));
assertEquals(PositionImpl.get(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());

cursor.close();
ledger.close();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}