Skip to content

Commit 3761dc4

Browse files
hanmzlhotari
andauthored
[fix][broker] Avoid block markDeletePosition forward when skip lost entries (#21210)
Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]>
1 parent 8d402f4 commit 3761dc4

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2890,6 +2890,54 @@ public void deleteFailed(ManagedLedgerException ex, Object ctx) {
28902890
}, null);
28912891
}
28922892

2893+
/**
2894+
* Manually acknowledge all entries from startPosition to endPosition.
2895+
* - Since this is an uncommon event, we focus on maintainability. So we do not modify
2896+
* {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
2897+
* {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
2898+
* - This method is valid regardless of the consumer ACK type.
2899+
* - If there is a consumer ack request after this event, it will also work.
2900+
*/
2901+
public void skipNonRecoverableEntries(Position startPosition, Position endPosition){
2902+
long ledgerId = startPosition.getLedgerId();
2903+
LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
2904+
if (ledgerInfo == null) {
2905+
return;
2906+
}
2907+
2908+
long startEntryId = Math.max(0, startPosition.getEntryId());
2909+
long endEntryId = ledgerId != endPosition.getLedgerId() ? ledgerInfo.getEntries() : endPosition.getEntryId();
2910+
if (startEntryId >= endEntryId) {
2911+
return;
2912+
}
2913+
2914+
lock.writeLock().lock();
2915+
log.warn("[{}] [{}] Since these entry for ledger [{}] is lost and the autoSkipNonRecoverableData is true, "
2916+
+ "these entries [{}:{}) will be auto acknowledge in subscription",
2917+
ledger.getName(), name, ledgerId, startEntryId, endEntryId);
2918+
try {
2919+
for (long i = startEntryId; i < endEntryId; i++) {
2920+
if (!individualDeletedMessages.contains(ledgerId, i)) {
2921+
asyncDelete(PositionFactory.create(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
2922+
@Override
2923+
public void deleteComplete(Object ctx) {
2924+
// ignore.
2925+
}
2926+
2927+
@Override
2928+
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
2929+
// The method internalMarkDelete already handled the failure operation. We only need to
2930+
// make sure the memory state is updated.
2931+
// If the broker crashed, the non-recoverable ledger will be detected again.
2932+
}
2933+
}, null);
2934+
}
2935+
}
2936+
} finally {
2937+
lock.writeLock().unlock();
2938+
}
2939+
}
2940+
28932941
// //////////////////////////////////////////////////
28942942

28952943
void startCreatingNewMetadataLedger() {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
137137
updateReadPosition(nexReadPosition);
138138
if (lostLedger != null) {
139139
cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
140+
} else {
141+
cursor.skipNonRecoverableEntries(readPosition, nexReadPosition);
140142
}
141143
checkReadCompletion();
142144
} else {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4766,6 +4766,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
47664766
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());
47674767
}
47684768

4769+
@Test
4770+
public void testSkipNonRecoverableEntries() throws ManagedLedgerException, InterruptedException {
4771+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
4772+
int maxMessagePerLedger = 10;
4773+
managedLedgerConfig.setMaxEntriesPerLedger(maxMessagePerLedger);
4774+
ManagedLedger ledger = factory.open("testSkipNonRecoverableEntries", managedLedgerConfig);
4775+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my-cursor");
4776+
4777+
Position lacPosition = ledger.getLastConfirmedEntry();
4778+
long ledgerId = lacPosition.getLedgerId();
4779+
assertEquals(PositionFactory.create(ledgerId, -1), cursor.getMarkDeletedPosition());
4780+
4781+
// Mock add 10 entry
4782+
for (int i = 0; i < 10; i++) {
4783+
ledger.addEntry(String.valueOf(i).getBytes());
4784+
}
4785+
4786+
// read 2 entry and delete these entries, MarkDeletedPosition move forward
4787+
List<Entry> entries = cursor.readEntries(2);
4788+
for (Entry entry : entries) {
4789+
cursor.delete(entry.getPosition());
4790+
}
4791+
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());
4792+
4793+
// read the next 6 entry and not delete, MarkDeletedPosition not move forward
4794+
entries = cursor.readEntries(6);
4795+
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());
4796+
4797+
// delete last read entry, MarkDeletedPosition not move forward
4798+
Entry lastEntry = entries.get(entries.size() - 1);
4799+
cursor.delete(lastEntry.getPosition());
4800+
assertEquals(PositionFactory.create(ledgerId, 1), cursor.getMarkDeletedPosition());
4801+
4802+
// call skip entries, MarkDeletedPosition move forward
4803+
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
4804+
PositionFactory.create(ledgerId, lastEntry.getEntryId()));
4805+
assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());
4806+
4807+
// repeat call skip entries, MarkDeletedPosition not change
4808+
cursor.skipNonRecoverableEntries(cursor.getMarkDeletedPosition(),
4809+
PositionFactory.create(ledgerId, lastEntry.getEntryId()));
4810+
assertEquals(PositionFactory.create(ledgerId, lastEntry.getEntryId()), cursor.getMarkDeletedPosition());
4811+
4812+
cursor.close();
4813+
ledger.close();
4814+
}
4815+
47694816
@Test
47704817
public void testRecoverCursorWithTerminateManagedLedger() throws Exception {
47714818
String mlName = "my_test_ledger";

0 commit comments

Comments
 (0)