diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d56352c119795..a65d830096eae 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -20,6 +20,7 @@ import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; @@ -49,6 +50,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -60,6 +62,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -110,6 +113,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil; import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.collections4.iterators.EmptyIterator; @@ -579,7 +583,10 @@ void testNumberOfEntriesWithReopen() throws Exception { @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry( + () -> factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1))); + c1 = ledger.openCursor("c1"); c2 = ledger.openCursor("c2"); @@ -1248,7 +1255,8 @@ void testRemoveCursorFail() throws Exception { @Test(timeOut = 20000) void cursorPersistence() throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger"); + // Open cursor_persistence_ledger ledger, create ledger 3. + ManagedLedger ledger = factory.open("cursor_persistence_ledger"); ManagedCursor c1 = ledger.openCursor("c1"); ManagedCursor c2 = ledger.openCursor("c2"); ledger.addEntry("dummy-entry-1".getBytes(Encoding)); @@ -1260,26 +1268,28 @@ void cursorPersistence() throws Exception { List entries = c1.readEntries(3); Position p1 = entries.get(2).getPosition(); + // Mark delete, create ledger 4 due to cursor ledger state is NoLedger. c1.markDelete(p1); entries.forEach(Entry::release); entries = c1.readEntries(4); Position p2 = entries.get(2).getPosition(); + // Mark delete, create ledger 5 due to cursor ledger state is NoLedger. c2.markDelete(p2); entries.forEach(Entry::release); // Reopen - @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + // Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1. + // See PR https://github.com/apache/pulsar/pull/25087. + ledger = factory2.open("cursor_persistence_ledger"); c1 = ledger.openCursor("c1"); c2 = ledger.openCursor("c2"); assertEquals(c1.getMarkDeletedPosition(), p1); - // move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed ManagedCursor finalC2 = c2; - Awaitility.await().untilAsserted(() -> assertNotEquals(finalC2.getMarkDeletedPosition(), p2)); + Awaitility.await().untilAsserted(() -> assertThat(finalC2.getMarkDeletedPosition()).isGreaterThan(p2)); } @Test(timeOut = 20000) @@ -1350,7 +1360,7 @@ void cursorPersistence2() throws Exception { assertEquals(c4.getMarkDeletedPosition(), p1); } - @Test(groups = "flaky") + @Test(timeOut = 20000) public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); @@ -1385,16 +1395,166 @@ public void markDeleteComplete(Object ctx) { } latch.await(); + assertEquals(c1.getNumberOfEntries(), 0); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition: + // 1. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger. + // 2. my_test_ledger ledger rollover triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB + // ManagedLedgerInfo into metaStore. + // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo + // into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state. + // Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++ + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); + ManagedCursor c2 = ledger.openCursor("c1"); + + // Three cases: + // 1. cursor recovered with lastPosition markDeletePosition + // 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we + // move markDeletePosition to (lastPositionLedgerId+2:-1) + // 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we + // move markDeletePosition to (lastPositionLedgerId+3:-1) + // See PR https://github.com/apache/pulsar/pull/25087. + assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()); + } + + @Test(timeOut = 20000) + public void asyncMarkDeleteBlockingWithOneShot() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + // open async_mark_delete_blocking_test_ledger ledger, create ledger 3. + ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference<>(); + // just for log debug purpose + Deque positions = new ConcurrentLinkedDeque<>(); + + // In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test + // more flaky. Flaky case: + // 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9. + // 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition 13:-1. + // Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger, + // and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0. + final int num = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(num); + // 10 entries per ledger, create ledger 4~13 + for (int i = 0; i < num; i++) { + String entryStr = "entry-" + i; + ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + positions.offer(position); + addEntryLatch.countDown(); + } + }, null); + } + addEntryLatch.await(); + + // If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover + // is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9. + // Thread.sleep(1000); + + final CountDownLatch markDeleteLatch = new CountDownLatch(1); + // Mark delete, create ledger 14 due to cursor ledger state is NoLedger. + // The num=100 flaky test case, markDelete operation is triggered twice: + // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9. + // 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover, + // The entries in ledger 12 are all consumed, and we move persistentMarkDeletePosition and + // markDeletePosition to 13:-1 due to PR https://github.com/apache/pulsar/pull/25087. + // Before this pr, we will not move persistentMarkDeletePosition. + // Two markDelete operations is almost triggered at the same time without order guarantee: + // 1. main thread triggered c1.asyncMarkDelete. + // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered create ledger 13 due to ledger full + // rollover by OpAddEntry. + // OpAddEntry will close and create a new ledger when closeWhenDone is true. + // In ManagedLedgerImpl class, MetaStoreCallback cb calls maybeUpdateCursorBeforeTrimmingConsumedLedger(), + // which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after + // AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or + // failure to it caller callback + c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + markDeleteLatch.countDown(); + } + }, null); + markDeleteLatch.await(); assertEquals(c1.getNumberOfEntries(), 0); // Reopen - @Cleanup("shutdown") - ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15. + // When executing ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the curPointedLedger is 13, + // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0, + // so we will move markDeletePosition to 15:-1, see PR https://github.com/apache/pulsar/pull/25087. + ledger = factory2.open("async_mark_delete_blocking_test_ledger"); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + log.info("positions size: {}, positions: {}", positions.size(), positions); + // To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should + // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087. + Awaitility.await() + .untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get())); + } + + @Test(timeOut = 20000) + public void asyncMarkDeleteBlockingWithMultiShots() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMetadataMaxEntriesPerLedger(5); + ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config); + final ManagedCursor c1 = ledger.openCursor("c1"); + final AtomicReference lastPosition = new AtomicReference<>(); + + final int num = 101; + final CountDownLatch addEntryLatch = new CountDownLatch(num); + for (int i = 0; i < num; i++) { + String entryStr = "entry-" + i; + ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + lastPosition.set(position); + c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() { + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + + @Override + public void markDeleteComplete(Object ctx) { + addEntryLatch.countDown(); + } + }, null); + + } + }, null); + } + addEntryLatch.await(); + assertEquals(c1.getNumberOfEntries(), 0); + + // Reopen + @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open("async_mark_delete_blocking_test_ledger"); + ManagedCursor c2 = ledger.openCursor("c1"); + + // flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2, + // the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover + // See PR https://github.com/apache/pulsar/pull/25087. + Awaitility.await() + .untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get())); } @Test(timeOut = 20000) @@ -1404,7 +1564,7 @@ void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception { final ManagedCursor c1 = ledger.openCursor("c1"); final int num = 100; - List positions = new ArrayList(); + List positions = new ArrayList<>(); for (int i = 0; i < num; i++) { Position p = ledger.addEntry("dummy-entry".getBytes(Encoding)); positions.add(p); @@ -1433,10 +1593,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + // Add retry logic here to prove open operation will finally success despite race condition. + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition); + assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition); } @Test(timeOut = 20000) @@ -4509,7 +4670,7 @@ public void testLazyCursorLedgerCreation() throws Exception { ledger.close(); } - @Test(groups = "flaky") + @Test(timeOut = 20000) public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); ManagedLedgerImpl ledger = @@ -4521,7 +4682,8 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti ledger = (ManagedLedgerImpl) factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig); assertNotNull(ledger.getCursors().get("test")); ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test"); - assertEquals(cursor1.getMarkDeletedPosition(), p1); + // Reopen ledger may move cursor to next position. See PR https://github.com/apache/pulsar/pull/25087. + assertThat(cursor1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(p1); factory2.shutdown(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 69927f1ebaf66..d53036df26b2c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -50,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -532,7 +534,7 @@ void markDeleteSkippingMessage() throws Exception { assertEquals(cursor.getReadPosition(), PositionFactory.create(p4.getLedgerId(), p4.getEntryId() + 1)); } - @Test(timeOut = 20000, groups = "flaky") + @Test(timeOut = 20000) public void asyncMarkDeleteBlocking() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(10); @@ -567,16 +569,18 @@ public void markDeleteComplete(Object ctx) { } latch.await(); - assertEquals(c1.getNumberOfEntries(), 0); // Reopen @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); - ledger = factory2.open("my_test_ledger"); + ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger")); ManagedCursor c2 = ledger.openCursor("c1"); - assertEquals(c2.getMarkDeletedPosition(), lastPosition.get()); + // Since all entries are consumed, we should move mark delete position to nextLedgerId:-1. + // See PR https://github.com/apache/pulsar/pull/25087. + Awaitility.await().untilAsserted( + () -> assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get())); } @Test(timeOut = 20000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java new file mode 100644 index 0000000000000..0acca723f8f61 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class ManagedLedgerTestUtil { + + public static T retry(ThrowingSupplier supplier) { + return retry(10, supplier); + } + + public static T retry(int retryCount, ThrowingSupplier supplier) { + for (int i = 0; i < retryCount; i++) { + if (i > 0) { + try { + log.info("Retrying after 100ms {}/{}", i, retryCount); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + try { + return supplier.get(); + } catch (Exception e) { + log.warn("Failed to execute supplier: {}", supplier, e); + } + } + throw new RuntimeException("Failed to execute supplier after " + retryCount + " retries"); + } + + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws Exception; + } + +}