2020
2121import static org .apache .bookkeeper .mledger .impl .EntryCountEstimator .estimateEntryCountByBytesSize ;
2222import static org .apache .bookkeeper .mledger .impl .cache .RangeEntryCacheImpl .BOOKKEEPER_READ_OVERHEAD_PER_ENTRY ;
23+ import static org .assertj .core .api .AssertionsForInterfaceTypes .assertThat ;
2324import static org .mockito .ArgumentMatchers .anyBoolean ;
2425import static org .mockito .ArgumentMatchers .anyInt ;
2526import static org .mockito .Mockito .any ;
4950import java .util .Arrays ;
5051import java .util .BitSet ;
5152import java .util .Collections ;
53+ import java .util .Deque ;
5254import java .util .HashMap ;
5355import java .util .HashSet ;
5456import java .util .Iterator ;
6062import java .util .concurrent .Callable ;
6163import java .util .concurrent .CompletableFuture ;
6264import java .util .concurrent .ConcurrentHashMap ;
65+ import java .util .concurrent .ConcurrentLinkedDeque ;
6366import java .util .concurrent .ConcurrentSkipListMap ;
6467import java .util .concurrent .CopyOnWriteArrayList ;
6568import java .util .concurrent .CountDownLatch ;
110113import org .apache .bookkeeper .mledger .proto .MLDataFormats ;
111114import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedCursorInfo ;
112115import org .apache .bookkeeper .mledger .proto .MLDataFormats .PositionInfo ;
116+ import org .apache .bookkeeper .mledger .util .ManagedLedgerTestUtil ;
113117import org .apache .bookkeeper .mledger .util .ManagedLedgerUtils ;
114118import org .apache .bookkeeper .test .MockedBookKeeperTestCase ;
115119import org .apache .commons .collections4 .iterators .EmptyIterator ;
@@ -577,7 +581,10 @@ void testNumberOfEntriesWithReopen() throws Exception {
577581
578582 @ Cleanup ("shutdown" )
579583 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
580- ledger = factory2 .open ("my_test_ledger" , new ManagedLedgerConfig ().setMaxEntriesPerLedger (1 ));
584+ // Add retry logic here to prove open operation will finally success despite race condition.
585+ ledger = ManagedLedgerTestUtil .retry (
586+ () -> factory2 .open ("my_test_ledger" , new ManagedLedgerConfig ().setMaxEntriesPerLedger (1 )));
587+
581588
582589 c1 = ledger .openCursor ("c1" );
583590 c2 = ledger .openCursor ("c2" );
@@ -1246,7 +1253,8 @@ void testRemoveCursorFail() throws Exception {
12461253
12471254 @ Test (timeOut = 20000 )
12481255 void cursorPersistence () throws Exception {
1249- ManagedLedger ledger = factory .open ("my_test_ledger" );
1256+ // Open cursor_persistence_ledger ledger, create ledger 3.
1257+ ManagedLedger ledger = factory .open ("cursor_persistence_ledger" );
12501258 ManagedCursor c1 = ledger .openCursor ("c1" );
12511259 ManagedCursor c2 = ledger .openCursor ("c2" );
12521260 ledger .addEntry ("dummy-entry-1" .getBytes (Encoding ));
@@ -1258,26 +1266,28 @@ void cursorPersistence() throws Exception {
12581266
12591267 List <Entry > entries = c1 .readEntries (3 );
12601268 Position p1 = entries .get (2 ).getPosition ();
1269+ // Mark delete, create ledger 4 due to cursor ledger state is NoLedger.
12611270 c1 .markDelete (p1 );
12621271 entries .forEach (Entry ::release );
12631272
12641273 entries = c1 .readEntries (4 );
12651274 Position p2 = entries .get (2 ).getPosition ();
1275+ // Mark delete, create ledger 5 due to cursor ledger state is NoLedger.
12661276 c2 .markDelete (p2 );
12671277 entries .forEach (Entry ::release );
12681278
12691279 // Reopen
1270-
12711280 @ Cleanup ("shutdown" )
12721281 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1273- ledger = factory2 .open ("my_test_ledger" );
1282+ // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1.
1283+ // See PR https://github.com/apache/pulsar/pull/25087.
1284+ ledger = factory2 .open ("cursor_persistence_ledger" );
12741285 c1 = ledger .openCursor ("c1" );
12751286 c2 = ledger .openCursor ("c2" );
12761287
12771288 assertEquals (c1 .getMarkDeletedPosition (), p1 );
1278- // move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed
12791289 ManagedCursor finalC2 = c2 ;
1280- Awaitility .await ().untilAsserted (() -> assertNotEquals (finalC2 .getMarkDeletedPosition (), p2 ));
1290+ Awaitility .await ().untilAsserted (() -> assertThat (finalC2 .getMarkDeletedPosition ()). isGreaterThan ( p2 ));
12811291 }
12821292
12831293 @ Test (timeOut = 20000 )
@@ -1348,7 +1358,7 @@ void cursorPersistence2() throws Exception {
13481358 assertEquals (c4 .getMarkDeletedPosition (), p1 );
13491359 }
13501360
1351- @ Test
1361+ @ Test ( timeOut = 20000 )
13521362 public void asyncMarkDeleteBlocking () throws Exception {
13531363 ManagedLedgerConfig config = new ManagedLedgerConfig ();
13541364 config .setMaxEntriesPerLedger (10 );
@@ -1383,16 +1393,166 @@ public void markDeleteComplete(Object ctx) {
13831393 }
13841394
13851395 latch .await ();
1396+ assertEquals (c1 .getNumberOfEntries (), 0 );
1397+
1398+ // Reopen
1399+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1400+ // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition:
1401+ // 1. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger.
1402+ // 2. my_test_ledger ledger rollover triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB
1403+ // ManagedLedgerInfo into metaStore.
1404+ // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo
1405+ // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state.
1406+ // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++
1407+ // Add retry logic here to prove open operation will finally success despite race condition.
1408+ ledger = ManagedLedgerTestUtil .retry (() -> factory2 .open ("my_test_ledger" ));
1409+ ManagedCursor c2 = ledger .openCursor ("c1" );
1410+
1411+ // Three cases:
1412+ // 1. cursor recovered with lastPosition markDeletePosition
1413+ // 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we
1414+ // move markDeletePosition to (lastPositionLedgerId+2:-1)
1415+ // 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we
1416+ // move markDeletePosition to (lastPositionLedgerId+3:-1)
1417+ // See PR https://github.com/apache/pulsar/pull/25087.
1418+ assertThat (c2 .getMarkDeletedPosition ()).isGreaterThanOrEqualTo (lastPosition .get ());
1419+ }
13861420
1421+ @ Test (timeOut = 20000 )
1422+ public void asyncMarkDeleteBlockingWithOneShot () throws Exception {
1423+ ManagedLedgerConfig config = new ManagedLedgerConfig ();
1424+ config .setMaxEntriesPerLedger (10 );
1425+ // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
1426+ ManagedLedger ledger = factory .open ("async_mark_delete_blocking_test_ledger" , config );
1427+ final ManagedCursor c1 = ledger .openCursor ("c1" );
1428+ final AtomicReference <Position > lastPosition = new AtomicReference <>();
1429+ // just for log debug purpose
1430+ Deque <Position > positions = new ConcurrentLinkedDeque <>();
1431+
1432+ // In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test
1433+ // more flaky. Flaky case:
1434+ // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9.
1435+ // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition 13:-1.
1436+ // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger,
1437+ // and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0.
1438+ final int num = 101 ;
1439+ final CountDownLatch addEntryLatch = new CountDownLatch (num );
1440+ // 10 entries per ledger, create ledger 4~13
1441+ for (int i = 0 ; i < num ; i ++) {
1442+ String entryStr = "entry-" + i ;
1443+ ledger .asyncAddEntry (entryStr .getBytes (Encoding ), new AddEntryCallback () {
1444+ @ Override
1445+ public void addFailed (ManagedLedgerException exception , Object ctx ) {
1446+ }
1447+
1448+ @ Override
1449+ public void addComplete (Position position , ByteBuf entryData , Object ctx ) {
1450+ lastPosition .set (position );
1451+ positions .offer (position );
1452+ addEntryLatch .countDown ();
1453+ }
1454+ }, null );
1455+ }
1456+ addEntryLatch .await ();
1457+
1458+ // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover
1459+ // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9.
1460+ // Thread.sleep(1000);
1461+
1462+ final CountDownLatch markDeleteLatch = new CountDownLatch (1 );
1463+ // Mark delete, create ledger 14 due to cursor ledger state is NoLedger.
1464+ // The num=100 flaky test case, markDelete operation is triggered twice:
1465+ // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9.
1466+ // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
1467+ // The entries in ledger 12 are all consumed, and we move persistentMarkDeletePosition and
1468+ // markDeletePosition to 13:-1 due to PR https://github.com/apache/pulsar/pull/25087.
1469+ // Before this pr, we will not move persistentMarkDeletePosition.
1470+ // Two markDelete operations is almost triggered at the same time without order guarantee:
1471+ // 1. main thread triggered c1.asyncMarkDelete.
1472+ // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered create ledger 13 due to ledger full
1473+ // rollover by OpAddEntry.
1474+ // OpAddEntry will close and create a new ledger when closeWhenDone is true.
1475+ // In ManagedLedgerImpl class, MetaStoreCallback cb calls maybeUpdateCursorBeforeTrimmingConsumedLedger(),
1476+ // which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after
1477+ // AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or
1478+ // failure to it caller callback
1479+ c1 .asyncMarkDelete (lastPosition .get (), new MarkDeleteCallback () {
1480+ @ Override
1481+ public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
1482+ }
1483+
1484+ @ Override
1485+ public void markDeleteComplete (Object ctx ) {
1486+ markDeleteLatch .countDown ();
1487+ }
1488+ }, null );
1489+ markDeleteLatch .await ();
13871490 assertEquals (c1 .getNumberOfEntries (), 0 );
13881491
13891492 // Reopen
1390- @ Cleanup ("shutdown" )
1391- ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1392- ledger = factory2 .open ("my_test_ledger" );
1493+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1494+ // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15.
1495+ // When executing ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the curPointedLedger is 13,
1496+ // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0,
1497+ // so we will move markDeletePosition to 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
1498+ ledger = factory2 .open ("async_mark_delete_blocking_test_ledger" );
13931499 ManagedCursor c2 = ledger .openCursor ("c1" );
13941500
1395- assertEquals (c2 .getMarkDeletedPosition (), lastPosition .get ());
1501+ log .info ("positions size: {}, positions: {}" , positions .size (), positions );
1502+ // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should
1503+ // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
1504+ Awaitility .await ()
1505+ .untilAsserted (() -> assertThat (c2 .getMarkDeletedPosition ()).isGreaterThan (lastPosition .get ()));
1506+ }
1507+
1508+ @ Test (timeOut = 20000 )
1509+ public void asyncMarkDeleteBlockingWithMultiShots () throws Exception {
1510+ ManagedLedgerConfig config = new ManagedLedgerConfig ();
1511+ config .setMaxEntriesPerLedger (10 );
1512+ config .setMetadataMaxEntriesPerLedger (5 );
1513+ ManagedLedger ledger = factory .open ("async_mark_delete_blocking_test_ledger" , config );
1514+ final ManagedCursor c1 = ledger .openCursor ("c1" );
1515+ final AtomicReference <Position > lastPosition = new AtomicReference <>();
1516+
1517+ final int num = 101 ;
1518+ final CountDownLatch addEntryLatch = new CountDownLatch (num );
1519+ for (int i = 0 ; i < num ; i ++) {
1520+ String entryStr = "entry-" + i ;
1521+ ledger .asyncAddEntry (entryStr .getBytes (Encoding ), new AddEntryCallback () {
1522+ @ Override
1523+ public void addFailed (ManagedLedgerException exception , Object ctx ) {
1524+ }
1525+
1526+ @ Override
1527+ public void addComplete (Position position , ByteBuf entryData , Object ctx ) {
1528+ lastPosition .set (position );
1529+ c1 .asyncMarkDelete (lastPosition .get (), new MarkDeleteCallback () {
1530+ @ Override
1531+ public void markDeleteFailed (ManagedLedgerException exception , Object ctx ) {
1532+ }
1533+
1534+ @ Override
1535+ public void markDeleteComplete (Object ctx ) {
1536+ addEntryLatch .countDown ();
1537+ }
1538+ }, null );
1539+
1540+ }
1541+ }, null );
1542+ }
1543+ addEntryLatch .await ();
1544+ assertEquals (c1 .getNumberOfEntries (), 0 );
1545+
1546+ // Reopen
1547+ @ Cleanup ("shutdown" ) ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1548+ ledger = factory2 .open ("async_mark_delete_blocking_test_ledger" );
1549+ ManagedCursor c2 = ledger .openCursor ("c1" );
1550+
1551+ // flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2,
1552+ // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover
1553+ // See PR https://github.com/apache/pulsar/pull/25087.
1554+ Awaitility .await ()
1555+ .untilAsserted (() -> assertThat (c2 .getMarkDeletedPosition ()).isGreaterThan (lastPosition .get ()));
13961556 }
13971557
13981558 @ Test (timeOut = 20000 )
@@ -1402,7 +1562,7 @@ void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception {
14021562 final ManagedCursor c1 = ledger .openCursor ("c1" );
14031563
14041564 final int num = 100 ;
1405- List <Position > positions = new ArrayList ();
1565+ List <Position > positions = new ArrayList <> ();
14061566 for (int i = 0 ; i < num ; i ++) {
14071567 Position p = ledger .addEntry ("dummy-entry" .getBytes (Encoding ));
14081568 positions .add (p );
@@ -1431,10 +1591,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
14311591 // Reopen
14321592 @ Cleanup ("shutdown" )
14331593 ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl (metadataStore , bkc );
1434- ledger = factory2 .open ("my_test_ledger" );
1594+ // Add retry logic here to prove open operation will finally success despite race condition.
1595+ ledger = ManagedLedgerTestUtil .retry (() -> factory2 .open ("my_test_ledger" ));
14351596 ManagedCursor c2 = ledger .openCursor ("c1" );
14361597
1437- assertEquals (c2 .getMarkDeletedPosition (), lastPosition );
1598+ assertThat (c2 .getMarkDeletedPosition ()). isGreaterThanOrEqualTo ( lastPosition );
14381599 }
14391600
14401601 @ Test (timeOut = 20000 )
@@ -4499,7 +4660,7 @@ public void testLazyCursorLedgerCreation() throws Exception {
44994660 ledger .close ();
45004661 }
45014662
4502- @ Test
4663+ @ Test ( timeOut = 20000 )
45034664 public void testLazyCursorLedgerCreationForSubscriptionCreation () throws Exception {
45044665 ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig ();
45054666 ManagedLedgerImpl ledger =
@@ -4511,7 +4672,8 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
45114672 ledger = (ManagedLedgerImpl ) factory2 .open ("testLazyCursorLedgerCreation" , managedLedgerConfig );
45124673 assertNotNull (ledger .getCursors ().get ("test" ));
45134674 ManagedCursorImpl cursor1 = (ManagedCursorImpl ) ledger .openCursor ("test" );
4514- assertEquals (cursor1 .getMarkDeletedPosition (), p1 );
4675+ // Reopen ledger may move cursor to next position. See PR https://github.com/apache/pulsar/pull/25087.
4676+ assertThat (cursor1 .getMarkDeletedPosition ()).isGreaterThanOrEqualTo (p1 );
45154677 factory2 .shutdown ();
45164678 }
45174679
0 commit comments