From d0ba81df005a84a1239824e25c5bf773f7f56011 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 23:17:27 +0800 Subject: [PATCH 01/17] [fix][test] Fix ManagedCursorTest.asyncMarkDeleteBlocking() flaky test --- .../mledger/impl/ManagedLedgerImpl.java | 10 ++- .../mledger/impl/ManagedCursorTest.java | 75 ++++++++++++------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b278cf6664d4..50002fa8bd8ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2713,26 +2713,28 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { for (ManagedCursor cursor : cursors) { Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null ? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition(); - LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); + LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); - if (currPointedLedger != null) { + if (curPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 - && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) { + && lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) { lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1); } } else { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { + // todo: no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } - if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) { + if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) > 0) { Position finalPosition = lastAckedPosition; log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); + // todo since this is an async method, should we make the caller a callback method too? cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d56352c119795..6c0f1e7943594 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -60,6 +61,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -1248,7 +1250,8 @@ void testRemoveCursorFail() throws Exception { @Test(timeOut = 20000) void cursorPersistence() throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger"); + // Open my_test_ledger ledger, create ledger 3 + ManagedLedger ledger = factory.open("cursor_persistence_ledger"); ManagedCursor c1 = ledger.openCursor("c1"); ManagedCursor c2 = ledger.openCursor("c2"); ledger.addEntry("dummy-entry-1".getBytes(Encoding)); @@ -1260,26 +1263,29 @@ void cursorPersistence() throws Exception { List entries = c1.readEntries(3); Position p1 = entries.get(2).getPosition(); + // Mark delete, create ledger 4 due to cursor ledger state is NoLedger c1.markDelete(p1); entries.forEach(Entry::release); entries = c1.readEntries(4); Position p2 = entries.get(2).getPosition(); + // Mark delete, create ledger 5 due to cursor ledger state is NoLedger c2.markDelete(p2); entries.forEach(Entry::release); // Reopen - @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + // Recovery open my_test_ledger ledger, create ledger 6, and move mark delete position to 6:-1 + // See PR https://github.com/apache/pulsar/pull/25087 + ledger = factory2.open("cursor_persistence_ledger"); c1 = ledger.openCursor("c1"); c2 = ledger.openCursor("c2"); assertEquals(c1.getMarkDeletedPosition(), p1); - // move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed ManagedCursor finalC2 = c2; - Awaitility.await().untilAsserted(() -> assertNotEquals(finalC2.getMarkDeletedPosition(), p2)); + Awaitility.await() + .untilAsserted(() -> assertEquals(finalC2.getMarkDeletedPosition(), new ImmutablePositionImpl(6, -1))); } @Test(timeOut = 20000) @@ -1350,17 +1356,22 @@ void cursorPersistence2() throws Exception { assertEquals(c4.getMarkDeletedPosition(), p1); } - @Test(groups = "flaky") + @Test(invocationCount = 100) public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); - config.setMetadataMaxEntriesPerLedger(5); - ManagedLedger ledger = factory.open("my_test_ledger", config); + // open my_test_ledger ledger, create ledger 3 + ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); final ManagedCursor c1 = ledger.openCursor("c1"); - final AtomicReference lastPosition = new AtomicReference(); - - final int num = 100; - final CountDownLatch latch = new CountDownLatch(num); + final AtomicReference lastPosition = new AtomicReference<>(); + // just for log debug purpose + Deque positions = new ConcurrentLinkedDeque<>(); + + // In previous flaky test, we set num = 100, this will make the test flaky. + // Here, we set num to 101, make sure the ledger 13 is created. + final int num = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(num); + // 10 entries per ledger, create ledger 4~13 for (int i = 0; i < num; i++) { ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { @Override @@ -1370,31 +1381,45 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { lastPosition.set(position); - c1.asyncMarkDelete(position, new MarkDeleteCallback() { - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - } - - @Override - public void markDeleteComplete(Object ctx) { - latch.countDown(); - } - }, null); + positions.offer(position); + addEntryLatch.countDown(); } }, null); } + addEntryLatch.await(); + assertEquals(lastPosition.get(), new ImmutablePositionImpl(13, 0)); - latch.await(); + // If we set num = 100, to avoid flaky test, we should add Thread.sleep(1000) here. + // Let trim ledger process complete, so the ledger 13 will be created with no entry + + final CountDownLatch markDeleteLatch = new CountDownLatch(1); + // mark delete, create ledger 14 due to cursor ledger state is NoLedger + c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + @Override + public void markDeleteComplete(Object ctx) { + markDeleteLatch.countDown(); + } + }, null); + markDeleteLatch.await(); assertEquals(c1.getNumberOfEntries(), 0); // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + // Recovery open my_test_ledger ledger, create ledger 15, and move mark delete position to 15:-1 + // See PR https://github.com/apache/pulsar/pull/25087 + ledger = factory2.open("async_mark_delete_blocking_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + log.info("positions size: {}, positions: {}", positions.size(), positions); + // To make sure maybeUpdateCursorBeforeTrimmingConsumedLedger is completed, we should wait until + // c2.getMarkDeletedPosition() equals 15:-1, also see PR https://github.com/apache/pulsar/pull/25087 + Awaitility.await() + .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), new ImmutablePositionImpl(15, -1))); } @Test(timeOut = 20000) From 51224d42205c9932bcb2e074887c2a8184432d60 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 20 Dec 2025 19:37:37 +0800 Subject: [PATCH 02/17] [fix][test] Add comments --- .../mledger/impl/ManagedLedgerImpl.java | 4 +-- .../mledger/impl/ManagedCursorTest.java | 30 ++++++++++++------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 50002fa8bd8ee..c4eaca8533865 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2727,14 +2727,14 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { - // todo: no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 + // TODO no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) > 0) { Position finalPosition = lastAckedPosition; log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - // todo since this is an async method, should we make the caller a callback method too? + // TODO since this is an async method, should we make the caller a callback method too? cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 6c0f1e7943594..c66db72714294 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1250,7 +1250,7 @@ void testRemoveCursorFail() throws Exception { @Test(timeOut = 20000) void cursorPersistence() throws Exception { - // Open my_test_ledger ledger, create ledger 3 + // Open cursor_persistence_ledger ledger, create ledger 3 ManagedLedger ledger = factory.open("cursor_persistence_ledger"); ManagedCursor c1 = ledger.openCursor("c1"); ManagedCursor c2 = ledger.openCursor("c2"); @@ -1276,7 +1276,7 @@ void cursorPersistence() throws Exception { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // Recovery open my_test_ledger ledger, create ledger 6, and move mark delete position to 6:-1 + // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1 // See PR https://github.com/apache/pulsar/pull/25087 ledger = factory2.open("cursor_persistence_ledger"); c1 = ledger.openCursor("c1"); @@ -1356,19 +1356,26 @@ void cursorPersistence2() throws Exception { assertEquals(c4.getMarkDeletedPosition(), p1); } - @Test(invocationCount = 100) + @Test(timeOut = 20000) public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); - // open my_test_ledger ledger, create ledger 3 + // open async_mark_delete_blocking_test_ledger ledger, create ledger 3 ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); final ManagedCursor c1 = ledger.openCursor("c1"); final AtomicReference lastPosition = new AtomicReference<>(); // just for log debug purpose Deque positions = new ConcurrentLinkedDeque<>(); - // In previous flaky test, we set num = 100, this will make the test flaky. - // Here, we set num to 101, make sure the ledger 13 is created. + // In previous flaky test, we set num = 100, this will make the test flaky. We create 10 ledgers, + // and last ledger 12 is full. When executing maybeUpdateCursorBeforeTrimmingConsumedLedger method, + // Ledger 12 may be rolled over or not: + // 1. Ledger 12 is not rolled over, so the curPointedLedger 12, the nextPointedLedger is 13 with no entry, + // we will move mark delete position to 15:-1 + // 2. Ledger 13 is the active ledger with no entry. so the curPointedLedger is null, + // we will keep the original mark delete position 12:9 + // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, so it will + // not be rolled over, the curPointedLedger is 13:0, and will never be null after ledger reopened final int num = 101; final CountDownLatch addEntryLatch = new CountDownLatch(num); // 10 entries per ledger, create ledger 4~13 @@ -1408,16 +1415,17 @@ public void markDeleteComplete(Object ctx) { assertEquals(c1.getNumberOfEntries(), 0); // Reopen - @Cleanup("shutdown") - ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // Recovery open my_test_ledger ledger, create ledger 15, and move mark delete position to 15:-1 - // See PR https://github.com/apache/pulsar/pull/25087 + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15. + // When executing maybeUpdateCursorBeforeTrimmingConsumedLedger method, the curPointedLedger is 13, + // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0, + // so we will move mark delete position to 15:-1, see PR https://github.com/apache/pulsar/pull/25087 ledger = factory2.open("async_mark_delete_blocking_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); log.info("positions size: {}, positions: {}", positions.size(), positions); // To make sure maybeUpdateCursorBeforeTrimmingConsumedLedger is completed, we should wait until - // c2.getMarkDeletedPosition() equals 15:-1, also see PR https://github.com/apache/pulsar/pull/25087 + // c2.getMarkDeletedPosition() equals 15:-1, see also PR https://github.com/apache/pulsar/pull/25087 Awaitility.await() .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), new ImmutablePositionImpl(15, -1))); } From 8aaafc129739de60baaae8058e3642362d301abf Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 20 Dec 2025 23:53:02 +0800 Subject: [PATCH 03/17] [fix][test] Analyze root cause, we are almost there --- .../mledger/impl/ManagedCursorTest.java | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c66db72714294..18d956819c71c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1250,7 +1250,7 @@ void testRemoveCursorFail() throws Exception { @Test(timeOut = 20000) void cursorPersistence() throws Exception { - // Open cursor_persistence_ledger ledger, create ledger 3 + // Open cursor_persistence_ledger ledger, create ledger 3. ManagedLedger ledger = factory.open("cursor_persistence_ledger"); ManagedCursor c1 = ledger.openCursor("c1"); ManagedCursor c2 = ledger.openCursor("c2"); @@ -1263,21 +1263,21 @@ void cursorPersistence() throws Exception { List entries = c1.readEntries(3); Position p1 = entries.get(2).getPosition(); - // Mark delete, create ledger 4 due to cursor ledger state is NoLedger + // Mark delete, create ledger 4 due to cursor ledger state is NoLedger. c1.markDelete(p1); entries.forEach(Entry::release); entries = c1.readEntries(4); Position p2 = entries.get(2).getPosition(); - // Mark delete, create ledger 5 due to cursor ledger state is NoLedger + // Mark delete, create ledger 5 due to cursor ledger state is NoLedger. c2.markDelete(p2); entries.forEach(Entry::release); // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1 - // See PR https://github.com/apache/pulsar/pull/25087 + // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1. + // See PR https://github.com/apache/pulsar/pull/25087. ledger = factory2.open("cursor_persistence_ledger"); c1 = ledger.openCursor("c1"); c2 = ledger.openCursor("c2"); @@ -1360,27 +1360,24 @@ void cursorPersistence2() throws Exception { public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); - // open async_mark_delete_blocking_test_ledger ledger, create ledger 3 + // open async_mark_delete_blocking_test_ledger ledger, create ledger 3. ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); final ManagedCursor c1 = ledger.openCursor("c1"); final AtomicReference lastPosition = new AtomicReference<>(); // just for log debug purpose Deque positions = new ConcurrentLinkedDeque<>(); - // In previous flaky test, we set num = 100, this will make the test flaky. We create 10 ledgers, - // and last ledger 12 is full. When executing maybeUpdateCursorBeforeTrimmingConsumedLedger method, - // Ledger 12 may be rolled over or not: - // 1. Ledger 12 is not rolled over, so the curPointedLedger 12, the nextPointedLedger is 13 with no entry, - // we will move mark delete position to 15:-1 - // 2. Ledger 13 is the active ledger with no entry. so the curPointedLedger is null, - // we will keep the original mark delete position 12:9 - // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, so it will - // not be rolled over, the curPointedLedger is 13:0, and will never be null after ledger reopened + // In previous flaky test, we set num = 100, this will make the test flaky. + // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9. + // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition null. + // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, + // and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0. final int num = 101; final CountDownLatch addEntryLatch = new CountDownLatch(num); // 10 entries per ledger, create ledger 4~13 for (int i = 0; i < num; i++) { - ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + String entryStr = "entry-" + i; + ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { @Override public void addFailed(ManagedLedgerException exception, Object ctx) { } @@ -1397,10 +1394,27 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { assertEquals(lastPosition.get(), new ImmutablePositionImpl(13, 0)); // If we set num = 100, to avoid flaky test, we should add Thread.sleep(1000) here. - // Let trim ledger process complete, so the ledger 13 will be created with no entry + // c1 will be always recovered with markDeletePosition 12:9. + // Thread.sleep(1000); final CountDownLatch markDeleteLatch = new CountDownLatch(1); - // mark delete, create ledger 14 due to cursor ledger state is NoLedger + // Mark delete, create ledger 14 due to cursor ledger state is NoLedger. + // Flaky test case: markDelete operation is triggered twice: + // 1. first is triggered by c1.asyncMarkDelete, markDeletePosition is 12:9. + // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover, + // markDeletePosition is 13:-1, and we move persistentMarkDeletePosition and markDeletePosition to 13:-1 + // due to PR https://github.com/apache/pulsar/pull/25087. + // Before this pr, we will not move persistentMarkDeletePosition. + // Two markDelete operations is almost triggered at the same time without order guarantee: + // 1. main thread triggered c1.asyncMarkDelete. + // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered create ledger 13 due to ledger full + // rollover by OpAddEntry. + // OpAddEntry will close and create a new ledger when closeWhenDone is true. + // In ManagedLedgerImpl class, MetaStoreCallback cb calls maybeUpdateCursorBeforeTrimmingConsumedLedger(), + // which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after + // AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or + // failure to it caller callback + // TODO the stack frame is every strange in case 2, seems infinitely recursive. c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @@ -1417,15 +1431,15 @@ public void markDeleteComplete(Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15. - // When executing maybeUpdateCursorBeforeTrimmingConsumedLedger method, the curPointedLedger is 13, + // When executing ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the curPointedLedger is 13, // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0, - // so we will move mark delete position to 15:-1, see PR https://github.com/apache/pulsar/pull/25087 + // so we will move markDeletePosition to 15:-1, see PR https://github.com/apache/pulsar/pull/25087. ledger = factory2.open("async_mark_delete_blocking_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); log.info("positions size: {}, positions: {}", positions.size(), positions); - // To make sure maybeUpdateCursorBeforeTrimmingConsumedLedger is completed, we should wait until - // c2.getMarkDeletedPosition() equals 15:-1, see also PR https://github.com/apache/pulsar/pull/25087 + // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should + // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087. Awaitility.await() .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), new ImmutablePositionImpl(15, -1))); } From 539bd53a3c2349ebda4b6642d9949bf2d2569147 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 07:51:16 +0800 Subject: [PATCH 04/17] [fix][test] Rollback asyncMarkDeleteBlocking flaky test, add asyncMarkDeleteBlockingWithMultiShots flaky test --- .../mledger/impl/ManagedLedgerImpl.java | 5 +- .../mledger/impl/ManagedCursorTest.java | 108 +++++++++++++++++- 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c4eaca8533865..97ccc2fbd9ad9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2717,6 +2717,10 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); + // TODO If PR https://github.com/apache/pulsar/pull/25087 is needed, + // may choose one of the following solutions: + // 1. Remove if (curPointedLedger != null) check, moving cursor to nextPointedLedgerId:-1 is not needed. + // 2. Make maybeUpdateCursorBeforeTrimmingConsumedLedger a callback method too to avoid race condition. if (curPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 @@ -2734,7 +2738,6 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) > 0) { Position finalPosition = lastAckedPosition; log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - // TODO since this is an async method, should we make the caller a callback method too? cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 18d956819c71c..e0c7d2873996a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1358,6 +1358,54 @@ void cursorPersistence2() throws Exception { @Test(timeOut = 20000) public void asyncMarkDeleteBlocking() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMetadataMaxEntriesPerLedger(5); + ManagedLedger ledger = factory.open("my_test_ledger", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference(); + + final int num = 100; + final CountDownLatch latch = new CountDownLatch(num); + for (int i = 0; i < num; i++) { + ledger.asyncAddEntry("entry".getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + c1.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + latch.countDown(); + } + }, null); + } + }, null); + } + + latch.await(); + + assertEquals(c1.getNumberOfEntries(), 0); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // flaky test case: may throw MetadataStoreException$BadVersionException, since rollover and recover may + // run at the same time, see PR https://github.com/apache/pulsar/pull/25087. + ledger = factory2.open("my_test_ledger"); + ManagedCursor c2 = ledger.openCursor("c1"); + + assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + } + + @Test(timeOut = 20000) + public void asyncMarkDeleteBlockingWithOneShot() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); // open async_mark_delete_blocking_test_ledger ledger, create ledger 3. @@ -1367,7 +1415,8 @@ public void asyncMarkDeleteBlocking() throws Exception { // just for log debug purpose Deque positions = new ConcurrentLinkedDeque<>(); - // In previous flaky test, we set num = 100, this will make the test flaky. + // In previous flaky test, we set num = 100, PR https://github.com/apache/pulsar/pull/25087 will make the test + // more flaky. Flaky case: // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9. // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition null. // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, @@ -1444,6 +1493,63 @@ public void markDeleteComplete(Object ctx) { .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), new ImmutablePositionImpl(15, -1))); } + @Test(timeOut = 20000) + public void asyncMarkDeleteBlockingWithMultiShots() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMetadataMaxEntriesPerLedger(5); + // open async_mark_delete_blocking_test_ledger ledger, create ledger 3. + ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference<>(); + + final int num = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(num); + // 10 entries per ledger, create ledger 5~14 + for (int i = 0; i < num; i++) { + String entryStr = "entry-" + i; + ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + // Mark delete, create ledger 4 due to cursor ledger state is NoLedger. + c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + addEntryLatch.countDown(); + } + }, null); + + } + }, null); + } + addEntryLatch.await(); + // assertEquals(lastPosition.get(), new ImmutablePositionImpl(14, 0)); + assertEquals(c1.getNumberOfEntries(), 0); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15. + ledger = factory2.open("async_mark_delete_blocking_test_ledger"); + ManagedCursor c2 = ledger.openCursor("c1"); + + // flaky test case: c2.getMarkDeletedPosition() may be larger than lastPositionLedgerId+1, since we don't + // know how many times will cursor.asyncMarkDelete() run in + // ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(). + // See PR https://github.com/apache/pulsar/pull/25087. + long lastPositionLedgerId = lastPosition.get().getLedgerId(); + Awaitility.await().untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), + new ImmutablePositionImpl(lastPositionLedgerId + 1, -1))); + } + @Test(timeOut = 20000) void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", From 0b890fe99733ee35186ef33842b878009a0e50f5 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 09:47:43 +0800 Subject: [PATCH 05/17] [fix][test] Fix asyncMarkDeleteBlocking and asyncMarkDeleteBlockingWithMultiShots flaky test --- .../mledger/impl/ManagedLedgerImpl.java | 6 +-- .../mledger/impl/ManagedCursorTest.java | 46 +++++++++++++------ 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 97ccc2fbd9ad9..d3c0bb185d1eb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2717,10 +2717,6 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); - // TODO If PR https://github.com/apache/pulsar/pull/25087 is needed, - // may choose one of the following solutions: - // 1. Remove if (curPointedLedger != null) check, moving cursor to nextPointedLedgerId:-1 is not needed. - // 2. Make maybeUpdateCursorBeforeTrimmingConsumedLedger a callback method too to avoid race condition. if (curPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 @@ -2738,6 +2734,8 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) > 0) { Position finalPosition = lastAckedPosition; log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); + // TODO If PR https://github.com/apache/pulsar/pull/25087 is needed, maybe we should make + // maybeUpdateCursorBeforeTrimmingConsumedLedger a callback method too to avoid race condition. cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index e0c7d2873996a..d832f2f5e7882 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1391,17 +1391,37 @@ public void markDeleteComplete(Object ctx) { } latch.await(); - assertEquals(c1.getNumberOfEntries(), 0); + // Sleep 1s here to wait ledger rollover finished + Thread.sleep(1000); + // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // flaky test case: may throw MetadataStoreException$BadVersionException, since rollover and recover may - // run at the same time, see PR https://github.com/apache/pulsar/pull/25087. + // flaky test case: may throw MetadataStoreException$BadVersionException, race condition: + // 1. my_test_ledger ledger rollover triggers async cursor.asyncMarkDelete(). + // 2. factory2.open() triggers recovery read versionA ManagedLedgerInfo of my_test_ledger ledger. + // 3. cursor.asyncMarkDelete() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo + // into metaStore. + // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo + // into metaStore, then throws BadVersionException and move my_test_ledger ledger to fenced state. + // See PR https://github.com/apache/pulsar/pull/25087. + // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++ ledger = factory2.open("my_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + // Three cases: + // 1. cursor recovered with lastPosition markDeletePosition + // 2. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger not rolled over + // 3. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger rolled over + // See PR https://github.com/apache/pulsar/pull/25087. + log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); + long lastPositionLedgerId = lastPosition.get().getLedgerId(); + Awaitility.await().untilAsserted(() -> assertTrue( + c2.getMarkDeletedPosition().equals(lastPosition.get()) || + c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)) + || c2.getMarkDeletedPosition() + .equals(new ImmutablePositionImpl(lastPositionLedgerId + 3, -1)))); } @Test(timeOut = 20000) @@ -1463,7 +1483,6 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { // which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after // AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or // failure to it caller callback - // TODO the stack frame is every strange in case 2, seems infinitely recursive. c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @@ -1498,14 +1517,12 @@ public void asyncMarkDeleteBlockingWithMultiShots() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); config.setMetadataMaxEntriesPerLedger(5); - // open async_mark_delete_blocking_test_ledger ledger, create ledger 3. ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); final ManagedCursor c1 = ledger.openCursor("c1"); final AtomicReference lastPosition = new AtomicReference<>(); final int num = 101; final CountDownLatch addEntryLatch = new CountDownLatch(num); - // 10 entries per ledger, create ledger 5~14 for (int i = 0; i < num; i++) { String entryStr = "entry-" + i; ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { @@ -1516,7 +1533,6 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { lastPosition.set(position); - // Mark delete, create ledger 4 due to cursor ledger state is NoLedger. c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @@ -1532,22 +1548,22 @@ public void markDeleteComplete(Object ctx) { }, null); } addEntryLatch.await(); - // assertEquals(lastPosition.get(), new ImmutablePositionImpl(14, 0)); assertEquals(c1.getNumberOfEntries(), 0); // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15. ledger = factory2.open("async_mark_delete_blocking_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); - // flaky test case: c2.getMarkDeletedPosition() may be larger than lastPositionLedgerId+1, since we don't - // know how many times will cursor.asyncMarkDelete() run in - // ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(). + // flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2, + // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover // See PR https://github.com/apache/pulsar/pull/25087. + log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); long lastPositionLedgerId = lastPosition.get().getLedgerId(); - Awaitility.await().untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), - new ImmutablePositionImpl(lastPositionLedgerId + 1, -1))); + Awaitility.await().untilAsserted(() -> assertTrue( + c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 1, -1)) + || c2.getMarkDeletedPosition() + .equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)))); } @Test(timeOut = 20000) From d338d6e22ddf9414a29dadaa9a218094ca27ca7d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 10:00:08 +0800 Subject: [PATCH 06/17] [fix][test] Optimize comments --- .../mledger/impl/ManagedCursorTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d832f2f5e7882..694586912a593 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1399,12 +1399,12 @@ public void markDeleteComplete(Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); // flaky test case: may throw MetadataStoreException$BadVersionException, race condition: - // 1. my_test_ledger ledger rollover triggers async cursor.asyncMarkDelete(). - // 2. factory2.open() triggers recovery read versionA ManagedLedgerInfo of my_test_ledger ledger. + // 1. my_test_ledger ledger rollover triggers async cursor.asyncMarkDelete() operation. + // 2. factory2.open() triggers recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. // 3. cursor.asyncMarkDelete() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo // into metaStore. - // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo - // into metaStore, then throws BadVersionException and move my_test_ledger ledger to fenced state. + // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo + // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state. // See PR https://github.com/apache/pulsar/pull/25087. // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++ ledger = factory2.open("my_test_ledger"); @@ -1435,7 +1435,7 @@ public void asyncMarkDeleteBlockingWithOneShot() throws Exception { // just for log debug purpose Deque positions = new ConcurrentLinkedDeque<>(); - // In previous flaky test, we set num = 100, PR https://github.com/apache/pulsar/pull/25087 will make the test + // In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test // more flaky. Flaky case: // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9. // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition null. @@ -1462,14 +1462,14 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { addEntryLatch.await(); assertEquals(lastPosition.get(), new ImmutablePositionImpl(13, 0)); - // If we set num = 100, to avoid flaky test, we should add Thread.sleep(1000) here. - // c1 will be always recovered with markDeletePosition 12:9. + // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover + // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9. // Thread.sleep(1000); final CountDownLatch markDeleteLatch = new CountDownLatch(1); // Mark delete, create ledger 14 due to cursor ledger state is NoLedger. - // Flaky test case: markDelete operation is triggered twice: - // 1. first is triggered by c1.asyncMarkDelete, markDeletePosition is 12:9. + // The num=100 flaky test case, markDelete operation is triggered twice: + // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9. // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover, // markDeletePosition is 13:-1, and we move persistentMarkDeletePosition and markDeletePosition to 13:-1 // due to PR https://github.com/apache/pulsar/pull/25087. From b497646c787853e907523bd679f2075d081b68ab Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 10:15:18 +0800 Subject: [PATCH 07/17] [fix][test] Optimize comments, fix checkstyle --- .../mledger/impl/ManagedCursorTest.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 694586912a593..269ca7015e07c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1412,16 +1412,20 @@ public void markDeleteComplete(Object ctx) { // Three cases: // 1. cursor recovered with lastPosition markDeletePosition - // 2. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger not rolled over - // 3. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger rolled over + // 2. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger not rolled over, we + // move markDeletePosition to (lastPositionLegderId+2:-1) + // 3. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger rolled over, we + // move markDeletePosition to (lastPositionLegderId+3:-1) // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); long lastPositionLedgerId = lastPosition.get().getLedgerId(); - Awaitility.await().untilAsserted(() -> assertTrue( - c2.getMarkDeletedPosition().equals(lastPosition.get()) || - c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)) - || c2.getMarkDeletedPosition() - .equals(new ImmutablePositionImpl(lastPositionLedgerId + 3, -1)))); + Awaitility.await().untilAsserted(() -> + assertTrue( + c2.getMarkDeletedPosition().equals(lastPosition.get()) + || c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)) + || c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 3, -1)) + ) + ); } @Test(timeOut = 20000) @@ -1438,7 +1442,7 @@ public void asyncMarkDeleteBlockingWithOneShot() throws Exception { // In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test // more flaky. Flaky case: // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9. - // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition null. + // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition 13:-1. // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, // and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0. final int num = 101; @@ -1471,8 +1475,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { // The num=100 flaky test case, markDelete operation is triggered twice: // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9. // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover, - // markDeletePosition is 13:-1, and we move persistentMarkDeletePosition and markDeletePosition to 13:-1 - // due to PR https://github.com/apache/pulsar/pull/25087. + // The entries in ledger 12 are all consumed, and we move persistentMarkDeletePosition and + // markDeletePosition to 13:-1 due to PR https://github.com/apache/pulsar/pull/25087. // Before this pr, we will not move persistentMarkDeletePosition. // Two markDelete operations is almost triggered at the same time without order guarantee: // 1. main thread triggered c1.asyncMarkDelete. From 379ff6e1263a860abd16fbac35ac6f3255e5154f Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 18:11:18 +0800 Subject: [PATCH 08/17] [fix][test] Optimize comments --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 269ca7015e07c..f4c444cc4ab6e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1398,9 +1398,9 @@ public void markDeleteComplete(Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - // flaky test case: may throw MetadataStoreException$BadVersionException, race condition: - // 1. my_test_ledger ledger rollover triggers async cursor.asyncMarkDelete() operation. - // 2. factory2.open() triggers recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. + // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition: + // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete() operation. + // 2. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. // 3. cursor.asyncMarkDelete() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo // into metaStore. // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo @@ -1412,9 +1412,9 @@ public void markDeleteComplete(Object ctx) { // Three cases: // 1. cursor recovered with lastPosition markDeletePosition - // 2. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger not rolled over, we + // 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we // move markDeletePosition to (lastPositionLegderId+2:-1) - // 3. cursor recovered with (lastPositionLegderId+1:-1) markDeletePosition, cursor ledger rolled over, we + // 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we // move markDeletePosition to (lastPositionLegderId+3:-1) // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); From 88d7febde483a72d9d4e3668b361506c036cc6d6 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 21 Dec 2025 21:03:09 +0800 Subject: [PATCH 09/17] [fix][test] Optimize comments --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d3c0bb185d1eb..cdee389b493dc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2727,7 +2727,7 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { - // TODO no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 + // TODO no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 ? log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } From 12c1124ed8affa1465d79f814285c434a2be8fd6 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 22 Dec 2025 15:49:15 +0800 Subject: [PATCH 10/17] [fix][test] Use PositionFactory.create() instead of new ImmutablePositionImpl() --- .../mledger/impl/ManagedCursorTest.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index f4c444cc4ab6e..eece05e609072 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1285,7 +1285,7 @@ void cursorPersistence() throws Exception { assertEquals(c1.getMarkDeletedPosition(), p1); ManagedCursor finalC2 = c2; Awaitility.await() - .untilAsserted(() -> assertEquals(finalC2.getMarkDeletedPosition(), new ImmutablePositionImpl(6, -1))); + .untilAsserted(() -> assertEquals(finalC2.getMarkDeletedPosition(), PositionFactory.create(6, -1))); } @Test(timeOut = 20000) @@ -1422,8 +1422,8 @@ public void markDeleteComplete(Object ctx) { Awaitility.await().untilAsserted(() -> assertTrue( c2.getMarkDeletedPosition().equals(lastPosition.get()) - || c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)) - || c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 3, -1)) + || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 2, -1)) + || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 3, -1)) ) ); } @@ -1464,7 +1464,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { }, null); } addEntryLatch.await(); - assertEquals(lastPosition.get(), new ImmutablePositionImpl(13, 0)); + assertEquals(lastPosition.get(), PositionFactory.create(13, 0)); // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9. @@ -1513,7 +1513,7 @@ public void markDeleteComplete(Object ctx) { // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087. Awaitility.await() - .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), new ImmutablePositionImpl(15, -1))); + .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), PositionFactory.create(15, -1))); } @Test(timeOut = 20000) @@ -1565,9 +1565,8 @@ public void markDeleteComplete(Object ctx) { log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); long lastPositionLedgerId = lastPosition.get().getLedgerId(); Awaitility.await().untilAsserted(() -> assertTrue( - c2.getMarkDeletedPosition().equals(new ImmutablePositionImpl(lastPositionLedgerId + 1, -1)) - || c2.getMarkDeletedPosition() - .equals(new ImmutablePositionImpl(lastPositionLedgerId + 2, -1)))); + c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 1, -1)) + || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 2, -1)))); } @Test(timeOut = 20000) From 3375d3275206e5605d75be6df61a4976f95b1706 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 09:35:18 +0800 Subject: [PATCH 11/17] [fix][test] Rollback ManagedLedgerImpl production code change --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index cdee389b493dc..4b278cf6664d4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2713,29 +2713,26 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() { for (ManagedCursor cursor : cursors) { Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null ? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition(); - LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); + LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId()); LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId())) .map(Map.Entry::getValue).orElse(null); - if (curPointedLedger != null) { + if (currPointedLedger != null) { if (nextPointedLedger != null) { if (lastAckedPosition.getEntryId() != -1 - && lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) { + && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) { lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1); } } else { log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor); } } else { - // TODO no ledger exists, should we move cursor mark deleted position to nextPointedLedger:-1 ? log.warn("Cursor: {} does not exist in the managed-ledger.", cursor); } - if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) > 0) { + if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) { Position finalPosition = lastAckedPosition; log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition); - // TODO If PR https://github.com/apache/pulsar/pull/25087 is needed, maybe we should make - // maybeUpdateCursorBeforeTrimmingConsumedLedger a callback method too to avoid race condition. cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(), new MarkDeleteCallback() { @Override From 0707d84ae53010ffccf2cc7af29349bf8d04a7bd Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 09:44:12 +0800 Subject: [PATCH 12/17] [fix][test] Avoid using specific positions in assertions --- .../mledger/impl/ManagedCursorTest.java | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index eece05e609072..24d518b0de9f4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1284,8 +1284,7 @@ void cursorPersistence() throws Exception { assertEquals(c1.getMarkDeletedPosition(), p1); ManagedCursor finalC2 = c2; - Awaitility.await() - .untilAsserted(() -> assertEquals(finalC2.getMarkDeletedPosition(), PositionFactory.create(6, -1))); + Awaitility.await().untilAsserted(() -> assertTrue(finalC2.getMarkDeletedPosition().compareTo(p2) > 0)); } @Test(timeOut = 20000) @@ -1418,14 +1417,8 @@ public void markDeleteComplete(Object ctx) { // move markDeletePosition to (lastPositionLegderId+3:-1) // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); - long lastPositionLedgerId = lastPosition.get().getLedgerId(); - Awaitility.await().untilAsserted(() -> - assertTrue( - c2.getMarkDeletedPosition().equals(lastPosition.get()) - || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 2, -1)) - || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 3, -1)) - ) - ); + Awaitility.await() + .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) >= 0)); } @Test(timeOut = 20000) @@ -1464,7 +1457,6 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { }, null); } addEntryLatch.await(); - assertEquals(lastPosition.get(), PositionFactory.create(13, 0)); // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9. @@ -1513,7 +1505,7 @@ public void markDeleteComplete(Object ctx) { // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087. Awaitility.await() - .untilAsserted(() -> assertEquals(c2.getMarkDeletedPosition(), PositionFactory.create(15, -1))); + .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0)); } @Test(timeOut = 20000) @@ -1563,10 +1555,8 @@ public void markDeleteComplete(Object ctx) { // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); - long lastPositionLedgerId = lastPosition.get().getLedgerId(); - Awaitility.await().untilAsserted(() -> assertTrue( - c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 1, -1)) - || c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId + 2, -1)))); + Awaitility.await() + .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0)); } @Test(timeOut = 20000) From eaaff0a2e0b631459fa05d0dc9a82d6e02947d70 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 15:46:56 +0800 Subject: [PATCH 13/17] [fix][test] Use AssertJ assertTrue instead of assertTrue to show the asserted values --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 24d518b0de9f4..c0b1a55dbdaae 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -20,6 +20,7 @@ import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; @@ -1284,7 +1285,7 @@ void cursorPersistence() throws Exception { assertEquals(c1.getMarkDeletedPosition(), p1); ManagedCursor finalC2 = c2; - Awaitility.await().untilAsserted(() -> assertTrue(finalC2.getMarkDeletedPosition().compareTo(p2) > 0)); + Awaitility.await().untilAsserted(() -> assertThat(finalC2.getMarkDeletedPosition()).isGreaterThan(p2)); } @Test(timeOut = 20000) @@ -1417,8 +1418,8 @@ public void markDeleteComplete(Object ctx) { // move markDeletePosition to (lastPositionLegderId+3:-1) // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); - Awaitility.await() - .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) >= 0)); + Awaitility.await().untilAsserted( + () -> assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get())); } @Test(timeOut = 20000) @@ -1505,7 +1506,7 @@ public void markDeleteComplete(Object ctx) { // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087. Awaitility.await() - .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0)); + .untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get())); } @Test(timeOut = 20000) @@ -1556,7 +1557,7 @@ public void markDeleteComplete(Object ctx) { // See PR https://github.com/apache/pulsar/pull/25087. log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); Awaitility.await() - .untilAsserted(() -> assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0)); + .untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get())); } @Test(timeOut = 20000) From 0ad95b30f0cd71fe4f7b5b1255a3707cf0d1b86c Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 20:37:56 +0800 Subject: [PATCH 14/17] [fix][test] Modify incorrect comments --- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c0b1a55dbdaae..3cc137477a6c2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1399,13 +1399,11 @@ public void markDeleteComplete(Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition: - // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete() operation. - // 2. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. - // 3. cursor.asyncMarkDelete() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo - // into metaStore. - // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo + // 1. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. + // 2. my_test_ledger ledger rollover triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB + // ManagedLedgerInfo into metaStore. + // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state. - // See PR https://github.com/apache/pulsar/pull/25087. // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++ ledger = factory2.open("my_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); From bae2c6dcaa49eb0ceeef6766a70fa4a5a1ee41a4 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 21:41:40 +0800 Subject: [PATCH 15/17] [fix][test] Fix the rest flaky tests --- .../mledger/impl/ManagedCursorTest.java | 27 +++++----- .../mledger/impl/NonDurableCursorTest.java | 12 +++-- .../mledger/util/ManagedLedgerTestUtil.java | 54 +++++++++++++++++++ 3 files changed, 75 insertions(+), 18 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 3cc137477a6c2..5cbab27138cac 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -113,6 +113,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil; import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.collections4.iterators.EmptyIterator; @@ -1393,9 +1394,6 @@ public void markDeleteComplete(Object ctx) { latch.await(); assertEquals(c1.getNumberOfEntries(), 0); - // Sleep 1s here to wait ledger rollover finished - Thread.sleep(1000); - // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition: @@ -1405,19 +1403,18 @@ public void markDeleteComplete(Object ctx) { // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state. // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++ - ledger = factory2.open("my_test_ledger"); + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); ManagedCursor c2 = ledger.openCursor("c1"); // Three cases: // 1. cursor recovered with lastPosition markDeletePosition // 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we - // move markDeletePosition to (lastPositionLegderId+2:-1) + // move markDeletePosition to (lastPositionLedgerId+2:-1) // 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we - // move markDeletePosition to (lastPositionLegderId+3:-1) + // move markDeletePosition to (lastPositionLedgerId+3:-1) // See PR https://github.com/apache/pulsar/pull/25087. - log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); - Awaitility.await().untilAsserted( - () -> assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get())); + assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()); } @Test(timeOut = 20000) @@ -1565,7 +1562,7 @@ void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception { final ManagedCursor c1 = ledger.openCursor("c1"); final int num = 100; - List positions = new ArrayList(); + List positions = new ArrayList<>(); for (int i = 0; i < num; i++) { Position p = ledger.addEntry("dummy-entry".getBytes(Encoding)); positions.add(p); @@ -1594,10 +1591,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition); + assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); } @Test(timeOut = 20000) @@ -4670,7 +4668,7 @@ public void testLazyCursorLedgerCreation() throws Exception { ledger.close(); } - @Test(groups = "flaky") + @Test(timeOut = 20000) public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); ManagedLedgerImpl ledger = @@ -4682,7 +4680,8 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti ledger = (ManagedLedgerImpl) factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig); assertNotNull(ledger.getCursors().get("test")); ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test"); - assertEquals(cursor1.getMarkDeletedPosition(), p1); + // Reopen ledger may move cursor to next position. See PR https://github.com/apache/pulsar/pull/25087. + assertThat(cursor1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(p1); factory2.shutdown(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 69927f1ebaf66..d53036df26b2c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -50,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -532,7 +534,7 @@ void markDeleteSkippingMessage() throws Exception { assertEquals(cursor.getReadPosition(), PositionFactory.create(p4.getLedgerId(), p4.getEntryId() + 1)); } - @Test(timeOut = 20000, groups = "flaky") + @Test(timeOut = 20000) public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); @@ -567,16 +569,18 @@ public void markDeleteComplete(Object ctx) { } latch.await(); - assertEquals(c1.getNumberOfEntries(), 0); // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + // Since all entries are consumed, we should move mark delete position to nextLedgerId:-1. + // See PR https://github.com/apache/pulsar/pull/25087. + Awaitility.await().untilAsserted( + () -> assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get())); } @Test(timeOut = 20000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java new file mode 100644 index 0000000000000..0acca723f8f61 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class ManagedLedgerTestUtil { + + public static T retry(ThrowingSupplier supplier) { + return retry(10, supplier); + } + + public static T retry(int retryCount, ThrowingSupplier supplier) { + for (int i = 0; i < retryCount; i++) { + if (i > 0) { + try { + log.info("Retrying after 100ms {}/{}", i, retryCount); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + try { + return supplier.get(); + } catch (Exception e) { + log.warn("Failed to execute supplier: {}", supplier, e); + } + } + throw new RuntimeException("Failed to execute supplier after " + retryCount + " retries"); + } + + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws Exception; + } + +} From 98e3d27651e21a6491168bb30abfe447d086ef7c Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 21:45:20 +0800 Subject: [PATCH 16/17] [fix][test] Fix the testNumberOfEntriesWithReopen flaky tests --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5cbab27138cac..5eb20851144e8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -583,7 +583,10 @@ void testNumberOfEntriesWithReopen() throws Exception { @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry( + () -> factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1))); + c1 = ledger.openCursor("c1"); c2 = ledger.openCursor("c2"); From a1ef783dd65a8d163b6dfe1e29dc15e4e467701d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 21:52:44 +0800 Subject: [PATCH 17/17] [fix][test] Remove unnecessary info log --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5eb20851144e8..a65d830096eae 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1553,7 +1553,6 @@ public void markDeleteComplete(Object ctx) { // flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2, // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover // See PR https://github.com/apache/pulsar/pull/25087. - log.info("c2 markDeletePosition: {}, lastPosition: {}", c2.getMarkDeletedPosition(), lastPosition); Awaitility.await() .untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get())); }