2020
2121import static org .apache .bookkeeper .mledger .impl .EntryCountEstimator .estimateEntryCountByBytesSize ;
2222import static org .apache .bookkeeper .mledger .impl .cache .RangeEntryCacheImpl .BOOKKEEPER_READ_OVERHEAD_PER_ENTRY ;
23+ import static org .assertj .core .api .AssertionsForInterfaceTypes .assertThat ;
2324import static org .mockito .ArgumentMatchers .anyBoolean ;
2425import static org .mockito .ArgumentMatchers .anyInt ;
2526import static org .mockito .Mockito .any ;
4950import java .util .Arrays ;
5051import java .util .BitSet ;
5152import java .util .Collections ;
53+ import java .util .Deque ;
5254import java .util .HashMap ;
5355import java .util .HashSet ;
5456import java .util .Iterator ;
6062import java .util .concurrent .Callable ;
6163import java .util .concurrent .CompletableFuture ;
6264import java .util .concurrent .ConcurrentHashMap ;
65+ import java .util .concurrent .ConcurrentLinkedDeque ;
6366import java .util .concurrent .ConcurrentSkipListMap ;
6467import java .util .concurrent .CopyOnWriteArrayList ;
6568import java .util .concurrent .CountDownLatch ;
110113import org .apache .bookkeeper .mledger .proto .MLDataFormats ;
111114import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedCursorInfo ;
112115import org .apache .bookkeeper .mledger .proto .MLDataFormats .PositionInfo ;
116+ import org .apache .bookkeeper .mledger .util .ManagedLedgerTestUtil ;
113117import org .apache .bookkeeper .mledger .util .ManagedLedgerUtils ;
114118import org .apache .bookkeeper .test .MockedBookKeeperTestCase ;
115119import org .apache .commons .collections4 .iterators .EmptyIterator ;
@@ -579,7 +583,10 @@ void testNumberOfEntriesWithReopen() throws Exception {
579583
580584 @ Cleanup ("shutdown" )
581585 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
582- ledger = factory2 .open ("my_test_ledger" , new ManagedLedgerConfig ().setMaxEntriesPerLedger (1 ));
586+ // Add retry logic here to prove open operation will finally success despite race condition.
587+ ledger = ManagedLedgerTestUtil .retry (
588+ () -> factory2 .open ("my_test_ledger" , new ManagedLedgerConfig ().setMaxEntriesPerLedger (1 )));
589+
583590
584591 c1 = ledger .openCursor ("c1" );
585592 c2 = ledger .openCursor ("c2" );
@@ -1248,7 +1255,8 @@ void testRemoveCursorFail() throws Exception {
12481255
12491256 @ Test (timeOut = 20000 )
12501257 void cursorPersistence () throws Exception {
1251- ManagedLedger ledger = factory .open ("my_test_ledger" );
1258+ // Open cursor_persistence_ledger ledger, create ledger 3.
1259+ ManagedLedger ledger = factory .open ("cursor_persistence_ledger" );
12521260 ManagedCursor c1 = ledger .openCursor ("c1" );
12531261 ManagedCursor c2 = ledger .openCursor ("c2" );
12541262 ledger .addEntry ("dummy-entry-1" .getBytes (Encoding ));
@@ -1260,26 +1268,28 @@ void cursorPersistence() throws Exception {
12601268
12611269 List <Entry > entries = c1 .readEntries (3 );
12621270 Position p1 = entries .get (2 ).getPosition ();
1271+ // Mark delete, create ledger 4 due to cursor ledger state is NoLedger.
12631272 c1 .markDelete (p1 );
12641273 entries .forEach (Entry ::release );
12651274
12661275 entries = c1 .readEntries (4 );
12671276 Position p2 = entries .get (2 ).getPosition ();
1277+ // Mark delete, create ledger 5 due to cursor ledger state is NoLedger.
12681278 c2 .markDelete (p2 );
12691279 entries .forEach (Entry ::release );
12701280
12711281 // Reopen
1272-
12731282 @ Cleanup ("shutdown" )
12741283 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1275- ledger = factory2 .open ("my_test_ledger" );
1284+ // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1.
1285+ // See PR https://github.com/apache/pulsar/pull/25087.
1286+ ledger = factory2 .open ("cursor_persistence_ledger" );
12761287 c1 = ledger .openCursor ("c1" );
12771288 c2 = ledger .openCursor ("c2" );
12781289
12791290 assertEquals (c1 .getMarkDeletedPosition (), p1 );
1280- // move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed
12811291 ManagedCursor finalC2 = c2 ;
1282- Awaitility .await ().untilAsserted (() -> assertNotEquals (finalC2 .getMarkDeletedPosition (), p2 ));
1292+ Awaitility .await ().untilAsserted (() -> assertThat (finalC2 .getMarkDeletedPosition ()). isGreaterThan ( p2 ));
12831293 }
12841294
12851295 @ Test (timeOut = 20000 )
@@ -1350,7 +1360,7 @@ void cursorPersistence2() throws Exception {
13501360 assertEquals (c4 .getMarkDeletedPosition (), p1 );
13511361 }
13521362
1353- @ Test
1363+ @ Test ( timeOut = 20000 )
13541364 public void asyncMarkDeleteBlocking () throws Exception {
13551365 ManagedLedgerConfig config = new ManagedLedgerConfig ();
13561366 config .setMaxEntriesPerLedger (10 );
@@ -1385,16 +1395,166 @@ public void markDeleteComplete(Object ctx) {
13851395 }
13861396
13871397 latch .await ();
1398+ assertEquals (c1 .getNumberOfEntries (), 0 );
1399+
1400+ // Reopen
1401+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1402+ // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition:
1403+ // 1. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger.
1404+ // 2. my_test_ledger ledger rollover triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB
1405+ // ManagedLedgerInfo into metaStore.
1406+ // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo
1407+ // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state.
1408+ // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++
1409+ // Add retry logic here to prove open operation will finally success despite race condition.
1410+ ledger = ManagedLedgerTestUtil .retry (() -> factory2 .open ("my_test_ledger" ));
1411+ ManagedCursor c2 = ledger .openCursor ("c1" );
1412+
1413+ // Three cases:
1414+ // 1. cursor recovered with lastPosition markDeletePosition
1415+ // 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we
1416+ // move markDeletePosition to (lastPositionLedgerId+2:-1)
1417+ // 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we
1418+ // move markDeletePosition to (lastPositionLedgerId+3:-1)
1419+ // See PR https://github.com/apache/pulsar/pull/25087.
1420+ assertThat (c2 .getMarkDeletedPosition ()).isGreaterThanOrEqualTo (lastPosition .get ());
1421+ }
13881422
1423+ @ Test (timeOut = 20000 )
1424+ public void asyncMarkDeleteBlockingWithOneShot () throws Exception {
1425+ ManagedLedgerConfig config = new ManagedLedgerConfig ();
1426+ config .setMaxEntriesPerLedger (10 );
1427+ // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
1428+ ManagedLedger ledger = factory .open ("async_mark_delete_blocking_test_ledger" , config );
1429+ final ManagedCursor c1 = ledger .openCursor ("c1" );
1430+ final AtomicReference <Position > lastPosition = new AtomicReference <>();
1431+ // just for log debug purpose
1432+ Deque <Position > positions = new ConcurrentLinkedDeque <>();
1433+
1434+ // In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test
1435+ // more flaky. Flaky case:
1436+ // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9.
1437+ // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition 13:-1.
1438+ // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger,
1439+ // and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0.
1440+ final int num = 101 ;
1441+ final CountDownLatch addEntryLatch = new CountDownLatch (num );
1442+ // 10 entries per ledger, create ledger 4~13
1443+ for (int i = 0 ; i < num ; i ++) {
1444+ String entryStr = "entry-" + i ;
1445+ ledger .asyncAddEntry (entryStr .getBytes (Encoding ), new AddEntryCallback () {
1446+ @ Override
1447+ public void addFailed (ManagedLedgerException exception , Object ctx ) {
1448+ }
1449+
1450+ @ Override
1451+ public void addComplete (Position position , ByteBuf entryData , Object ctx ) {
1452+ lastPosition .set (position );
1453+ positions .offer (position );
1454+ addEntryLatch .countDown ();
1455+ }
1456+ }, null );
1457+ }
1458+ addEntryLatch .await ();
1459+
1460+ // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover
1461+ // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9.
1462+ // Thread.sleep(1000);
1463+
1464+ final CountDownLatch markDeleteLatch = new CountDownLatch (1 );
1465+ // Mark delete, create ledger 14 due to cursor ledger state is NoLedger.
1466+ // The num=100 flaky test case, markDelete operation is triggered twice:
1467+ // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9.
1468+ // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
1469+ // The entries in ledger 12 are all consumed, and we move persistentMarkDeletePosition and
1470+ // markDeletePosition to 13:-1 due to PR https://github.com/apache/pulsar/pull/25087.
1471+ // Before this pr, we will not move persistentMarkDeletePosition.
1472+ // Two markDelete operations is almost triggered at the same time without order guarantee:
1473+ // 1. main thread triggered c1.asyncMarkDelete.
1474+ // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered create ledger 13 due to ledger full
1475+ // rollover by OpAddEntry.
1476+ // OpAddEntry will close and create a new ledger when closeWhenDone is true.
1477+ // In ManagedLedgerImpl class, MetaStoreCallback cb calls maybeUpdateCursorBeforeTrimmingConsumedLedger(),
1478+ // which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after
1479+ // AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or
1480+ // failure to it caller callback
1481+ c1 .asyncMarkDelete (lastPosition .get (), new MarkDeleteCallback () {
1482+ @ Override
1483+ public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
1484+ }
1485+
1486+ @ Override
1487+ public void markDeleteComplete (Object ctx ) {
1488+ markDeleteLatch .countDown ();
1489+ }
1490+ }, null );
1491+ markDeleteLatch .await ();
13891492 assertEquals (c1 .getNumberOfEntries (), 0 );
13901493
13911494 // Reopen
1392- @ Cleanup ("shutdown" )
1393- ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1394- ledger = factory2 .open ("my_test_ledger" );
1495+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1496+ // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15.
1497+ // When executing ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the curPointedLedger is 13,
1498+ // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0,
1499+ // so we will move markDeletePosition to 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
1500+ ledger = factory2 .open ("async_mark_delete_blocking_test_ledger" );
13951501 ManagedCursor c2 = ledger .openCursor ("c1" );
13961502
1397- assertEquals (c2 .getMarkDeletedPosition (), lastPosition .get ());
1503+ log .info ("positions size: {}, positions: {}" , positions .size (), positions );
1504+ // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should
1505+ // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
1506+ Awaitility .await ()
1507+ .untilAsserted (() -> assertThat (c2 .getMarkDeletedPosition ()).isGreaterThan (lastPosition .get ()));
1508+ }
1509+
1510+ @ Test (timeOut = 20000 )
1511+ public void asyncMarkDeleteBlockingWithMultiShots () throws Exception {
1512+ ManagedLedgerConfig config = new ManagedLedgerConfig ();
1513+ config .setMaxEntriesPerLedger (10 );
1514+ config .setMetadataMaxEntriesPerLedger (5 );
1515+ ManagedLedger ledger = factory .open ("async_mark_delete_blocking_test_ledger" , config );
1516+ final ManagedCursor c1 = ledger .openCursor ("c1" );
1517+ final AtomicReference <Position > lastPosition = new AtomicReference <>();
1518+
1519+ final int num = 101 ;
1520+ final CountDownLatch addEntryLatch = new CountDownLatch (num );
1521+ for (int i = 0 ; i < num ; i ++) {
1522+ String entryStr = "entry-" + i ;
1523+ ledger .asyncAddEntry (entryStr .getBytes (Encoding ), new AddEntryCallback () {
1524+ @ Override
1525+ public void addFailed (ManagedLedgerException exception , Object ctx ) {
1526+ }
1527+
1528+ @ Override
1529+ public void addComplete (Position position , ByteBuf entryData , Object ctx ) {
1530+ lastPosition .set (position );
1531+ c1 .asyncMarkDelete (lastPosition .get (), new MarkDeleteCallback () {
1532+ @ Override
1533+ public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
1534+ }
1535+
1536+ @ Override
1537+ public void markDeleteComplete (Object ctx ) {
1538+ addEntryLatch .countDown ();
1539+ }
1540+ }, null );
1541+
1542+ }
1543+ }, null );
1544+ }
1545+ addEntryLatch .await ();
1546+ assertEquals (c1 .getNumberOfEntries (), 0 );
1547+
1548+ // Reopen
1549+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1550+ ledger = factory2 .open ("async_mark_delete_blocking_test_ledger" );
1551+ ManagedCursor c2 = ledger .openCursor ("c1" );
1552+
1553+ // flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2,
1554+ // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover
1555+ // See PR https://github.com/apache/pulsar/pull/25087.
1556+ Awaitility .await ()
1557+ .untilAsserted (() -> assertThat (c2 .getMarkDeletedPosition ()).isGreaterThan (lastPosition .get ()));
13981558 }
13991559
14001560 @ Test (timeOut = 20000 )
@@ -1404,7 +1564,7 @@ void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception {
14041564 final ManagedCursor c1 = ledger .openCursor ("c1" );
14051565
14061566 final int num = 100 ;
1407- List <Position > positions = new ArrayList ();
1567+ List <Position > positions = new ArrayList <> ();
14081568 for (int i = 0 ; i < num ; i ++) {
14091569 Position p = ledger .addEntry ("dummy-entry" .getBytes (Encoding ));
14101570 positions .add (p );
@@ -1433,10 +1593,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
14331593 // Reopen
14341594 @ Cleanup ("shutdown" )
14351595 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1436- ledger = factory2 .open ("my_test_ledger" );
1596+ // Add retry logic here to prove open operation will finally success despite race condition.
1597+ ledger = ManagedLedgerTestUtil .retry (() -> factory2 .open ("my_test_ledger" ));
14371598 ManagedCursor c2 = ledger .openCursor ("c1" );
14381599
1439- assertEquals (c2 .getMarkDeletedPosition (), lastPosition );
1600+ assertThat (c2 .getMarkDeletedPosition ()). isGreaterThanOrEqualTo ( lastPosition );
14401601 }
14411602
14421603 @ Test (timeOut = 20000 )
@@ -4509,7 +4670,7 @@ public void testLazyCursorLedgerCreation() throws Exception {
45094670 ledger .close ();
45104671 }
45114672
4512- @ Test
4673+ @ Test ( timeOut = 20000 )
45134674 public void testLazyCursorLedgerCreationForSubscriptionCreation () throws Exception {
45144675 ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig ();
45154676 ManagedLedgerImpl ledger =
@@ -4521,7 +4682,8 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
45214682 ledger = (ManagedLedgerImpl ) factory2 .open ("testLazyCursorLedgerCreation" , managedLedgerConfig );
45224683 assertNotNull (ledger .getCursors ().get ("test" ));
45234684 ManagedCursorImpl cursor1 = (ManagedCursorImpl ) ledger .openCursor ("test" );
4524- assertEquals (cursor1 .getMarkDeletedPosition (), p1 );
4685+ // Reopen ledger may move cursor to next position. See PR https://github.com/apache/pulsar/pull/25087.
4686+ assertThat (cursor1 .getMarkDeletedPosition ()).isGreaterThanOrEqualTo (p1 );
45254687 factory2 .shutdown ();
45264688 }
45274689
0 commit comments