Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -60,6 +62,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -110,6 +113,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.collections4.iterators.EmptyIterator;
Expand Down Expand Up @@ -579,7 +583,10 @@ void testNumberOfEntriesWithReopen() throws Exception {

@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
// Add retry logic here to prove open operation will finally success despite race condition.
ledger = ManagedLedgerTestUtil.retry(
() -> factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)));


c1 = ledger.openCursor("c1");
c2 = ledger.openCursor("c2");
Expand Down Expand Up @@ -1248,7 +1255,8 @@ void testRemoveCursorFail() throws Exception {

@Test(timeOut = 20000)
void cursorPersistence() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
// Open cursor_persistence_ledger ledger, create ledger 3.
ManagedLedger ledger = factory.open("cursor_persistence_ledger");
ManagedCursor c1 = ledger.openCursor("c1");
ManagedCursor c2 = ledger.openCursor("c2");
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Expand All @@ -1260,26 +1268,28 @@ void cursorPersistence() throws Exception {

List<Entry> entries = c1.readEntries(3);
Position p1 = entries.get(2).getPosition();
// Mark delete, create ledger 4 due to cursor ledger state is NoLedger.
c1.markDelete(p1);
entries.forEach(Entry::release);

entries = c1.readEntries(4);
Position p2 = entries.get(2).getPosition();
// Mark delete, create ledger 5 due to cursor ledger state is NoLedger.
c2.markDelete(p2);
entries.forEach(Entry::release);

// Reopen

@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("my_test_ledger");
// Recovery open cursor_persistence_ledger ledger, create ledger 6, and move mark delete position to 6:-1.
// See PR https://github.com/apache/pulsar/pull/25087.
ledger = factory2.open("cursor_persistence_ledger");
c1 = ledger.openCursor("c1");
c2 = ledger.openCursor("c2");

assertEquals(c1.getMarkDeletedPosition(), p1);
// move mark-delete-position from 3:5 to 6:-1 since all the entries have been consumed
ManagedCursor finalC2 = c2;
Awaitility.await().untilAsserted(() -> assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
Awaitility.await().untilAsserted(() -> assertThat(finalC2.getMarkDeletedPosition()).isGreaterThan(p2));
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -1350,7 +1360,7 @@ void cursorPersistence2() throws Exception {
assertEquals(c4.getMarkDeletedPosition(), p1);
}

@Test(groups = "flaky")
@Test(timeOut = 20000)
public void asyncMarkDeleteBlocking() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
Expand Down Expand Up @@ -1385,16 +1395,166 @@ public void markDeleteComplete(Object ctx) {
}

latch.await();
assertEquals(c1.getNumberOfEntries(), 0);

// Reopen
@Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// flaky test case: factory2.open() may throw MetadataStoreException$BadVersionException, race condition:
// 1. factory2.open() triggers ledger recovery, read versionA ManagedLedgerInfo of my_test_ledger ledger.
// 2. my_test_ledger ledger rollover triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionB
// ManagedLedgerInfo into metaStore.
// 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), update versionA ManagedLedgerInfo
// into metaStore, then throws BadVersionException and moves my_test_ledger ledger to fenced state.
// Recovery open async_mark_delete_blocking_test_ledger ledger, ledgerId++
// Add retry logic here to prove open operation will finally success despite race condition.
ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger"));
ManagedCursor c2 = ledger.openCursor("c1");

// Three cases:
// 1. cursor recovered with lastPosition markDeletePosition
// 2. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger not rolled over, we
// move markDeletePosition to (lastPositionLedgerId+2:-1)
// 3. cursor recovered with (lastPositionLedgerId+1:-1) markDeletePosition, cursor ledger rolled over, we
// move markDeletePosition to (lastPositionLedgerId+3:-1)
// See PR https://github.com/apache/pulsar/pull/25087.
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get());
}

@Test(timeOut = 20000)
public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
// open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config);
final ManagedCursor c1 = ledger.openCursor("c1");
final AtomicReference<Position> lastPosition = new AtomicReference<>();
// just for log debug purpose
Deque<Position> positions = new ConcurrentLinkedDeque<>();

// In previous flaky test, we set num=100, PR https://github.com/apache/pulsar/pull/25087 will make the test
// more flaky. Flaky case:
// 1. cursor recovered with markDeletePosition 12:9, persistentMarkDeletePosition 12:9.
// 2. cursor recovered with mark markDeletePosition 13:-1, persistentMarkDeletePosition 13:-1.
// Here, we set num to 101, make sure the ledger 13 is created and become the active(last) ledger,
// and cursor will always be recovered with markDeletePosition 13:0, persistentMarkDeletePosition 13:0.
final int num = 101;
final CountDownLatch addEntryLatch = new CountDownLatch(num);
// 10 entries per ledger, create ledger 4~13
for (int i = 0; i < num; i++) {
String entryStr = "entry-" + i;
ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
lastPosition.set(position);
positions.offer(position);
addEntryLatch.countDown();
}
}, null);
}
addEntryLatch.await();

// If we set num=100, to avoid flaky test, we should add Thread.sleep(1000) here to make sure ledger rollover
// is finished, but this sleep can not guarantee c1 always recovered with markDeletePosition 12:9.
// Thread.sleep(1000);

final CountDownLatch markDeleteLatch = new CountDownLatch(1);
// Mark delete, create ledger 14 due to cursor ledger state is NoLedger.
// The num=100 flaky test case, markDelete operation is triggered twice:
// 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition is 12:9.
// 2. second is triggered by ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
// The entries in ledger 12 are all consumed, and we move persistentMarkDeletePosition and
// markDeletePosition to 13:-1 due to PR https://github.com/apache/pulsar/pull/25087.
// Before this pr, we will not move persistentMarkDeletePosition.
// Two markDelete operations is almost triggered at the same time without order guarantee:
// 1. main thread triggered c1.asyncMarkDelete.
// 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered create ledger 13 due to ledger full
// rollover by OpAddEntry.
// OpAddEntry will close and create a new ledger when closeWhenDone is true.
// In ManagedLedgerImpl class, MetaStoreCallback cb calls maybeUpdateCursorBeforeTrimmingConsumedLedger(),
// which calls cursor.asyncMarkDelete(), so markDelete operation in ledger rollover may execute after
// AddEntryCallback.addComplete(). The root cause is cursor.asyncMarkDelete() does not propagate completion or
// failure to it caller callback
c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void markDeleteComplete(Object ctx) {
markDeleteLatch.countDown();
}
}, null);
markDeleteLatch.await();
assertEquals(c1.getNumberOfEntries(), 0);

// Reopen
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("my_test_ledger");
@Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Recovery open async_mark_delete_blocking_test_ledger ledger, create ledger 15.
// When executing ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the curPointedLedger is 13,
// the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 13:0,
// so we will move markDeletePosition to 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
ledger = factory2.open("async_mark_delete_blocking_test_ledger");
ManagedCursor c2 = ledger.openCursor("c1");

assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
log.info("positions size: {}, positions: {}", positions.size(), positions);
// To make sure ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, we should
// wait until c2.getMarkDeletedPosition() equals 15:-1, see PR https://github.com/apache/pulsar/pull/25087.
Awaitility.await()
.untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
}

@Test(timeOut = 20000)
public void asyncMarkDeleteBlockingWithMultiShots() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMetadataMaxEntriesPerLedger(5);
ManagedLedger ledger = factory.open("async_mark_delete_blocking_test_ledger", config);
final ManagedCursor c1 = ledger.openCursor("c1");
final AtomicReference<Position> lastPosition = new AtomicReference<>();

final int num = 101;
final CountDownLatch addEntryLatch = new CountDownLatch(num);
for (int i = 0; i < num; i++) {
String entryStr = "entry-" + i;
ledger.asyncAddEntry(entryStr.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
lastPosition.set(position);
c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void markDeleteComplete(Object ctx) {
addEntryLatch.countDown();
}
}, null);

}
}, null);
}
addEntryLatch.await();
assertEquals(c1.getNumberOfEntries(), 0);

// Reopen
@Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("async_mark_delete_blocking_test_ledger");
ManagedCursor c2 = ledger.openCursor("c1");

// flaky test case: c2.getMarkDeletedPosition() may be equals lastPositionLedgerId+1 or lastPositionLedgerId+2,
// the last c1.asyncMarkDelete() operation may trigger a cursor ledger rollover
// See PR https://github.com/apache/pulsar/pull/25087.
Awaitility.await()
.untilAsserted(() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
}

@Test(timeOut = 20000)
Expand All @@ -1404,7 +1564,7 @@ void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception {
final ManagedCursor c1 = ledger.openCursor("c1");

final int num = 100;
List<Position> positions = new ArrayList();
List<Position> positions = new ArrayList<>();
for (int i = 0; i < num; i++) {
Position p = ledger.addEntry("dummy-entry".getBytes(Encoding));
positions.add(p);
Expand Down Expand Up @@ -1433,10 +1593,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
// Reopen
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("my_test_ledger");
// Add retry logic here to prove open operation will finally success despite race condition.
ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger"));
ManagedCursor c2 = ledger.openCursor("c1");

assertEquals(c2.getMarkDeletedPosition(), lastPosition);
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -4509,7 +4670,7 @@ public void testLazyCursorLedgerCreation() throws Exception {
ledger.close();
}

@Test(groups = "flaky")
@Test(timeOut = 20000)
public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger =
Expand All @@ -4521,7 +4682,8 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
ledger = (ManagedLedgerImpl) factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
assertNotNull(ledger.getCursors().get("test"));
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor1.getMarkDeletedPosition(), p1);
// Reopen ledger may move cursor to next position. See PR https://github.com/apache/pulsar/pull/25087.
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(p1);
factory2.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand Down Expand Up @@ -532,7 +534,7 @@ void markDeleteSkippingMessage() throws Exception {
assertEquals(cursor.getReadPosition(), PositionFactory.create(p4.getLedgerId(), p4.getEntryId() + 1));
}

@Test(timeOut = 20000, groups = "flaky")
@Test(timeOut = 20000)
public void asyncMarkDeleteBlocking() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
Expand Down Expand Up @@ -567,16 +569,18 @@ public void markDeleteComplete(Object ctx) {
}

latch.await();

assertEquals(c1.getNumberOfEntries(), 0);

// Reopen
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory2.open("my_test_ledger");
ledger = ManagedLedgerTestUtil.retry(() -> factory2.open("my_test_ledger"));
ManagedCursor c2 = ledger.openCursor("c1");

assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
// Since all entries are consumed, we should move mark delete position to nextLedgerId:-1.
// See PR https://github.com/apache/pulsar/pull/25087.
Awaitility.await().untilAsserted(
() -> assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()));
}

@Test(timeOut = 20000)
Expand Down
Loading
Loading