Skip to content

Commit 1617bb2

Browse files
authored
[fix][broker] Fix markDeletedPosition race condition in ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method (#25110)
1 parent d72dc04 commit 1617bb2

File tree

6 files changed

+287
-72
lines changed

6 files changed

+287
-72
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,14 +1250,17 @@ public boolean hasMoreEntries() {
12501250

12511251
@Override
12521252
public long getNumberOfEntries() {
1253-
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
1253+
Position readPos = readPosition;
1254+
Position lastPosition = ledger.getLastPosition();
1255+
Position nextPosition = lastPosition.getNext();
1256+
if (readPos.compareTo(nextPosition) > 0) {
12541257
if (log.isDebugEnabled()) {
12551258
log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read",
1256-
ledger.getName(), name, readPosition, ledger.getLastPosition());
1259+
ledger.getName(), name, readPos, lastPosition);
12571260
}
12581261
return 0;
12591262
} else {
1260-
return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext()));
1263+
return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
12611264
}
12621265
}
12631266

@@ -2255,25 +2258,27 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
22552258
}
22562259

22572260
Position newPosition = ackBatchPosition(position);
2258-
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
2261+
Position markDeletePos = markDeletePosition;
2262+
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
2263+
if (lastConfirmedEntry.compareTo(newPosition) < 0) {
22592264
boolean shouldCursorMoveForward = false;
22602265
try {
2261-
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
2262-
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
2266+
long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
2267+
Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
22632268
shouldCursorMoveForward = nextValidLedger != null
2264-
&& (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
2269+
&& (markDeletePos.getEntryId() + 1 >= ledgerEntries)
22652270
&& (newPosition.getLedgerId() == nextValidLedger);
22662271
} catch (Exception e) {
22672272
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
22682273
}
22692274

22702275
if (shouldCursorMoveForward) {
22712276
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
2272-
ledger.getName(), markDeletePosition, newPosition);
2277+
ledger.getName(), markDeletePos, newPosition);
22732278
} else {
22742279
if (log.isDebugEnabled()) {
22752280
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
2276-
+ " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
2281+
+ " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name);
22772282
}
22782283
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
22792284
return;
@@ -2329,11 +2334,15 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map<String, L
23292334
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
23302335
ledger.mbean.addMarkDeleteOp();
23312336

2332-
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
2333-
alignAcknowledgeStatusAfterPersisted);
2334-
23352337
// We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
23362338
synchronized (pendingMarkDeleteOps) {
2339+
// use given properties or when missing, use the properties from the previous field value
2340+
MarkDeleteEntry last = pendingMarkDeleteOps.peekLast();
2341+
Map<String, Long> propertiesToUse =
2342+
properties != null ? properties : (last != null ? last.properties : getProperties());
2343+
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
2344+
alignAcknowledgeStatusAfterPersisted);
2345+
23372346
// The state might have changed while we were waiting on the queue mutex
23382347
switch (state) {
23392348
case Closed:
@@ -2701,17 +2710,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
27012710
// update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
27022711
private void updateLastMarkDeleteEntryToLatest(final Position newPosition,
27032712
final Map<String, Long> properties) {
2704-
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
2705-
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
2706-
// keep current value, don't update
2707-
return last;
2708-
} else {
2709-
// use given properties or when missing, use the properties from the previous field value
2710-
Map<String, Long> propertiesToUse =
2711-
properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
2712-
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
2713-
}
2714-
});
2713+
synchronized (pendingMarkDeleteOps) {
2714+
// use given properties or when missing, use the properties from the previous field value
2715+
MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast();
2716+
Map<String, Long> propertiesToUse =
2717+
properties != null ? properties : (lastPending != null ? lastPending.properties : getProperties());
2718+
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
2719+
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
2720+
// keep current value, don't update
2721+
return last;
2722+
} else {
2723+
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
2724+
}
2725+
});
2726+
}
27152727
}
27162728

27172729
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -481,14 +481,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
481481
public void initializeComplete() {
482482
log.info("[{}] Successfully initialize managed ledger", name);
483483
pendingInitializeLedgers.remove(name, pendingLedger);
484-
future.complete(newledger);
485-
486-
// May need to update the cursor position
487-
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
488-
// May need to trigger offloading
489-
if (config.isTriggerOffloadOnTopicLoad()) {
490-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
491-
}
484+
// May need to update the cursor position and wait them finished
485+
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> {
486+
// ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger
487+
future.complete(newledger);
488+
// May need to trigger offloading
489+
if (config.isTriggerOffloadOnTopicLoad()) {
490+
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
491+
}
492+
});
492493
}
493494

494495
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,11 +1761,10 @@ public void operationComplete(Void v, Stat stat) {
17611761
updateLedgersIdsComplete(originalCurrentLedger);
17621762
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
17631763
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
1764+
// May need to update the cursor position
1765+
maybeUpdateCursorBeforeTrimmingConsumedLedger();
17641766
}
17651767
metadataMutex.unlock();
1766-
1767-
// May need to update the cursor position
1768-
maybeUpdateCursorBeforeTrimmingConsumedLedger();
17691768
}
17701769

17711770
@Override
@@ -2709,18 +2708,23 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
27092708
this.waitingEntryCallBacks.add(cb);
27102709
}
27112710

2712-
public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
2711+
public CompletableFuture<Void> maybeUpdateCursorBeforeTrimmingConsumedLedger() {
2712+
List<CompletableFuture<Void>> cursorMarkDeleteFutures = new ArrayList<>();
27132713
for (ManagedCursor cursor : cursors) {
2714-
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
2715-
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
2716-
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
2714+
CompletableFuture<Void> future = new CompletableFuture<>();
2715+
cursorMarkDeleteFutures.add(future);
2716+
2717+
// Snapshot positions into a local variables to avoid race condition.
2718+
Position markDeletedPosition = cursor.getMarkDeletedPosition();
2719+
Position lastAckedPosition = markDeletedPosition;
2720+
LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
27172721
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
27182722
.map(Map.Entry::getValue).orElse(null);
27192723

2720-
if (currPointedLedger != null) {
2724+
if (curPointedLedger != null) {
27212725
if (nextPointedLedger != null) {
27222726
if (lastAckedPosition.getEntryId() != -1
2723-
&& lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
2727+
&& lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) {
27242728
lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
27252729
}
27262730
} else {
@@ -2730,25 +2734,37 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
27302734
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
27312735
}
27322736

2733-
if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
2737+
int compareResult = lastAckedPosition.compareTo(markDeletedPosition);
2738+
if (compareResult > 0) {
27342739
Position finalPosition = lastAckedPosition;
2735-
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
2736-
cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(),
2737-
new MarkDeleteCallback() {
2738-
@Override
2739-
public void markDeleteComplete(Object ctx) {
2740-
log.info("Successfully persisted cursor position for cursor:{} to {}",
2741-
cursor, finalPosition);
2742-
}
2740+
log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor,
2741+
markDeletedPosition, lastAckedPosition);
2742+
cursor.asyncMarkDelete(lastAckedPosition, null, new MarkDeleteCallback() {
2743+
@Override
2744+
public void markDeleteComplete(Object ctx) {
2745+
log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition);
2746+
future.complete(null);
2747+
}
27432748

2744-
@Override
2745-
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
2746-
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
2747-
cursor, cursor.getMarkDeletedPosition(), finalPosition, exception);
2748-
}
2749-
}, null);
2749+
@Override
2750+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
2751+
log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(),
2752+
finalPosition, exception);
2753+
future.completeExceptionally(exception);
2754+
}
2755+
}, null);
2756+
} else if (compareResult == 0) {
2757+
log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.",
2758+
cursor, markDeletedPosition);
2759+
future.complete(null);
2760+
} else {
2761+
// Should not happen
2762+
log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:"
2763+
+ " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition);
2764+
future.complete(null);
27502765
}
27512766
}
2767+
return FutureUtil.waitForAll(cursorMarkDeleteFutures);
27522768
}
27532769

27542770
private void trimConsumedLedgersInBackground() {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,13 @@ void recover(final VoidCallback callback) {
103103
protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
104104
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
105105
// Bypass persistence of mark-delete position and individually deleted messages info
106-
107-
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
108-
alignAcknowledgeStatusAfterPersisted);
106+
MarkDeleteEntry mdEntry;
109107
lock.writeLock().lock();
110108
try {
109+
// use given properties or when missing, use the properties from the previous field value
110+
Map<String, Long> propertiesToUse = properties != null ? properties : getProperties();
111+
mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
112+
alignAcknowledgeStatusAfterPersisted);
111113
lastMarkDeleteEntry = mdEntry;
112114
mdEntry.alignAcknowledgeStatus();
113115
} finally {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
407407
ml.close();
408408
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
409409
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
410-
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
410+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
411+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);
411412

412413
// cleanup.
413414
ml.delete();
@@ -498,12 +499,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
498499
assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
499500
assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
500501
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
502+
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
503+
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
501504

502505
// Verify the mark delete position can be recovered properly.
503506
ml.close();
504507
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
505508
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
506-
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
509+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
510+
// If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
511+
// does not exist in the managed-ledger. Recovered cursor's position will not be moved forward.
512+
// TODO should be handled in ledger trim process.
513+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
507514

508515
// cleanup.
509516
ml.delete();
@@ -4441,7 +4448,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
44414448
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
44424449
ManagedCursor c2 = ledger2.openCursor("c");
44434450

4444-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4451+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
44454452
});
44464453
}
44474454

@@ -4500,7 +4507,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
45004507
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config);
45014508
ManagedCursor c2 = ledger2.openCursor("c");
45024509

4503-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4510+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
45044511
});
45054512
}
45064513

@@ -4552,7 +4559,7 @@ public void testFlushCursorAfterError() throws Exception {
45524559
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
45534560
ManagedCursor c2 = ledger2.openCursor("c");
45544561

4555-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4562+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
45564563
});
45574564
}
45584565

@@ -4815,7 +4822,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
48154822
}
48164823

48174824
@Test
4818-
public void testLazyCursorLedgerCreation() throws Exception {
4825+
public void testEagerCursorLedgerCreation() throws Exception {
48194826
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
48204827
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
48214828
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
@@ -4840,8 +4847,8 @@ public void testLazyCursorLedgerCreation() throws Exception {
48404847
ledger = (ManagedLedgerImpl) factory
48414848
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
48424849
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
4843-
assertEquals(cursor1.getState(), "NoLedger");
4844-
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
4850+
assertEquals(cursor1.getState(), "Open");
4851+
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);
48454852

48464853
// Verify the recovered cursor can work with new mark delete.
48474854
lastPosition = null;

0 commit comments

Comments
 (0)