106106import org .apache .bookkeeper .mledger .util .ManagedLedgerUtils ;
107107import org .apache .commons .lang3 .mutable .MutableBoolean ;
108108import org .apache .commons .lang3 .mutable .MutableInt ;
109+ import org .apache .commons .lang3 .mutable .MutableLong ;
109110import org .apache .commons .lang3 .tuple .Pair ;
110111import org .apache .pulsar .common .policies .data .ManagedLedgerInternalStats ;
111112import org .apache .pulsar .common .util .DateFormatter ;
@@ -209,7 +210,7 @@ public class ManagedCursorImpl implements ManagedCursor {
209210 @ Getter
210211 @ VisibleForTesting
211212 @ Nullable protected final ConcurrentSkipListMap <Position , BitSet > batchDeletedIndexes ;
212- private final ReadWriteLock lock = new ReentrantReadWriteLock ();
213+ protected final ReadWriteLock lock = new ReentrantReadWriteLock ();
213214
214215 private RateLimiter markDeleteLimiter ;
215216 // The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
@@ -239,6 +240,7 @@ class MarkDeleteEntry {
239240 final MarkDeleteCallback callback ;
240241 final Object ctx ;
241242 final Map <String , Long > properties ;
243+ final Runnable alignAcknowledgeStatusAfterPersisted ;
242244
243245 // If the callbackGroup is set, it means this mark-delete request was done on behalf of a group of request (just
244246 // persist the last one in the chain). In this case we need to trigger the callbacks for every request in the
@@ -247,10 +249,26 @@ class MarkDeleteEntry {
247249
248250 public MarkDeleteEntry (Position newPosition , Map <String , Long > properties ,
249251 MarkDeleteCallback callback , Object ctx ) {
252+ this (newPosition , properties , callback , ctx , null );
253+ }
254+
255+ public MarkDeleteEntry (Position newPosition , Map <String , Long > properties ,
256+ MarkDeleteCallback callback , Object ctx , Runnable alignAcknowledgeStatusAfterPersisted ) {
257+ if (alignAcknowledgeStatusAfterPersisted == null ) {
258+ alignAcknowledgeStatusAfterPersisted = () -> {
259+ if (batchDeletedIndexes != null ) {
260+ batchDeletedIndexes .subMap (PositionFactory .EARLIEST ,
261+ false , PositionFactory .create (newPosition .getLedgerId (),
262+ newPosition .getEntryId ()), true ).clear ();
263+ }
264+ persistentMarkDeletePosition = newPosition ;
265+ };
266+ }
250267 this .newPosition = newPosition ;
251268 this .properties = properties ;
252269 this .callback = callback ;
253270 this .ctx = ctx ;
271+ this .alignAcknowledgeStatusAfterPersisted = alignAcknowledgeStatusAfterPersisted ;
254272 }
255273
256274 public void triggerComplete () {
@@ -268,6 +286,10 @@ public void triggerComplete() {
268286 }
269287 }
270288
289+ public void alignAcknowledgeStatus () {
290+ this .alignAcknowledgeStatusAfterPersisted .run ();
291+ }
292+
271293 public void triggerFailed (ManagedLedgerException exception ) {
272294 if (callbackGroup != null ) {
273295 for (MarkDeleteEntry e : callbackGroup ) {
@@ -1483,47 +1505,63 @@ protected void internalResetCursor(Position proposedReadPosition,
14831505
14841506 final Position newMarkDeletePosition = ledger .getPreviousPosition (newReadPosition );
14851507
1508+ Runnable alignAcknowledgeStatusAfterPersisted = () -> {
1509+ // Correct the variable "messagesConsumedCounter".
1510+ // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1511+ // old one.
1512+ int compareRes = ledger .comparePositions (markDeletePosition , newMarkDeletePosition );
1513+ if (compareRes > 0 ) {
1514+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), -getNumberOfEntries (
1515+ Range .openClosed (newMarkDeletePosition , markDeletePosition )));
1516+ } else if (compareRes < 0 ) {
1517+ long entries = getNumberOfEntries (Range .openClosed (markDeletePosition , newMarkDeletePosition ));
1518+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (ManagedCursorImpl .this , entries );
1519+ }
1520+ individualDeletedMessages .removeAtMost (newMarkDeletePosition .getLedgerId (),
1521+ newMarkDeletePosition .getEntryId ());
1522+
1523+ // Entries already acknowledged, which is larger than the new mark deleted position.
1524+ MutableLong ackedEntriesAfterMdPosition = new MutableLong ();
1525+ individualDeletedMessages .forEach ((r ) -> {
1526+ for (long i = r .lowerEndpoint ().getEntryId () + 1 ; i <= r .upperEndpoint ().getEntryId (); i ++) {
1527+ ackedEntriesAfterMdPosition .incrementAndGet ();
1528+ }
1529+ return true ;
1530+ });
1531+ MSG_CONSUMED_COUNTER_UPDATER .addAndGet (ManagedCursorImpl .this ,
1532+ -ackedEntriesAfterMdPosition .get ().longValue ());
1533+ markDeletePosition = newMarkDeletePosition ;
1534+ lastMarkDeleteEntry = new MarkDeleteEntry (newMarkDeletePosition , isCompactionCursor ()
1535+ ? getProperties () : Collections .emptyMap (), null , null );
1536+ individualDeletedMessages .clear ();
1537+ if (batchDeletedIndexes != null ) {
1538+ batchDeletedIndexes .clear ();
1539+ AckSetStateUtil .maybeGetAckSetState (newReadPosition ).ifPresent (ackSetState -> {
1540+ long [] resetWords = ackSetState .getAckSet ();
1541+ if (resetWords != null ) {
1542+ batchDeletedIndexes .put (newReadPosition , BitSet .valueOf (resetWords ));
1543+ }
1544+ });
1545+ }
1546+
1547+ Position oldReadPosition = readPosition ;
1548+ if (oldReadPosition .compareTo (newReadPosition ) >= 0 ) {
1549+ log .info ("[{}] reset readPosition to {} before current read readPosition {} on cursor {}" ,
1550+ ledger .getName (), newReadPosition , oldReadPosition , name );
1551+ } else {
1552+ log .info ("[{}] reset readPosition to {} skipping from current read readPosition {} on "
1553+ + "cursor {}" , ledger .getName (), newReadPosition , oldReadPosition , name );
1554+ }
1555+ readPosition = newReadPosition ;
1556+ };
1557+
14861558 VoidCallback finalCallback = new VoidCallback () {
14871559 @ Override
14881560 public void operationComplete () {
14891561
14901562 // modify mark delete and read position since we are able to persist new position for cursor
14911563 lock .writeLock ().lock ();
14921564 try {
1493- // Correct the variable "messagesConsumedCounter".
1494- // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the
1495- // old one.
1496- int compareRes = ledger .comparePositions (markDeletePosition , newMarkDeletePosition );
1497- if (compareRes > 0 ) {
1498- MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), -getNumberOfEntries (
1499- Range .openClosed (newMarkDeletePosition , markDeletePosition )));
1500- } else if (compareRes < 0 ) {
1501- MSG_CONSUMED_COUNTER_UPDATER .addAndGet (cursorImpl (), getNumberOfEntries (
1502- Range .openClosed (markDeletePosition , newMarkDeletePosition )));
1503- }
1504- markDeletePosition = newMarkDeletePosition ;
1505- lastMarkDeleteEntry = new MarkDeleteEntry (newMarkDeletePosition , isCompactionCursor ()
1506- ? getProperties () : Collections .emptyMap (), null , null );
1507- individualDeletedMessages .clear ();
1508- if (batchDeletedIndexes != null ) {
1509- batchDeletedIndexes .clear ();
1510- AckSetStateUtil .maybeGetAckSetState (newReadPosition ).ifPresent (ackSetState -> {
1511- long [] resetWords = ackSetState .getAckSet ();
1512- if (resetWords != null ) {
1513- batchDeletedIndexes .put (newReadPosition , BitSet .valueOf (resetWords ));
1514- }
1515- });
1516- }
1517-
1518- Position oldReadPosition = readPosition ;
1519- if (oldReadPosition .compareTo (newReadPosition ) >= 0 ) {
1520- log .info ("[{}] reset readPosition to {} before current read readPosition {} on cursor {}" ,
1521- ledger .getName (), newReadPosition , oldReadPosition , name );
1522- } else {
1523- log .info ("[{}] reset readPosition to {} skipping from current read readPosition {} on "
1524- + "cursor {}" , ledger .getName (), newReadPosition , oldReadPosition , name );
1525- }
1526- readPosition = newReadPosition ;
15271565 ledger .onCursorReadPositionUpdated (ManagedCursorImpl .this , newReadPosition );
15281566 } finally {
15291567 lock .writeLock ().unlock ();
@@ -1567,7 +1605,7 @@ public void markDeleteComplete(Object ctx) {
15671605 public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
15681606 finalCallback .operationFailed (exception );
15691607 }
1570- }, null );
1608+ }, null , alignAcknowledgeStatusAfterPersisted );
15711609 }
15721610
15731611 @ Override
@@ -2186,7 +2224,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
21862224 callback .markDeleteComplete (ctx );
21872225 return ;
21882226 }
2189- internalAsyncMarkDelete (newPosition , properties , callback , ctx );
2227+ internalAsyncMarkDelete (newPosition , properties , callback , ctx , null );
21902228 }
21912229
21922230 private Position ackBatchPosition (Position position ) {
@@ -2215,10 +2253,11 @@ private Position ackBatchPosition(Position position) {
22152253 }
22162254
22172255 protected void internalAsyncMarkDelete (final Position newPosition , Map <String , Long > properties ,
2218- final MarkDeleteCallback callback , final Object ctx ) {
2256+ final MarkDeleteCallback callback , final Object ctx , Runnable alignAcknowledgeStatusAfterPersisted ) {
22192257 ledger .mbean .addMarkDeleteOp ();
22202258
2221- MarkDeleteEntry mdEntry = new MarkDeleteEntry (newPosition , properties , callback , ctx );
2259+ MarkDeleteEntry mdEntry = new MarkDeleteEntry (newPosition , properties , callback , ctx ,
2260+ alignAcknowledgeStatusAfterPersisted );
22222261
22232262 // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
22242263 synchronized (pendingMarkDeleteOps ) {
@@ -2317,14 +2356,7 @@ public void operationComplete() {
23172356 // point.
23182357 lock .writeLock ().lock ();
23192358 try {
2320- individualDeletedMessages .removeAtMost (mdEntry .newPosition .getLedgerId (),
2321- mdEntry .newPosition .getEntryId ());
2322- if (batchDeletedIndexes != null ) {
2323- batchDeletedIndexes .subMap (PositionFactory .EARLIEST ,
2324- false , PositionFactory .create (mdEntry .newPosition .getLedgerId (),
2325- mdEntry .newPosition .getEntryId ()), true ).clear ();
2326- }
2327- persistentMarkDeletePosition = mdEntry .newPosition ;
2359+ mdEntry .alignAcknowledgeStatus ();
23282360 } finally {
23292361 lock .writeLock ().unlock ();
23302362 }
@@ -2581,7 +2613,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
25812613 callback .deleteFailed (exception , ctx );
25822614 }
25832615
2584- }, ctx );
2616+ }, ctx , null );
25852617
25862618 } catch (Exception e ) {
25872619 log .warn ("[{}] [{}] Error doing asyncDelete [{}]" , ledger .getName (), name , e .getMessage (), e );
0 commit comments