Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
Expand Down Expand Up @@ -5108,7 +5109,7 @@ public void testComparePositions() throws Exception {
ml.delete();
}

@Test
@Test(timeOut = 20000)
public void testTrimmerRaceCondition() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
Expand Down Expand Up @@ -5144,20 +5145,23 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);

latch.await();
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastPosition);
assertEquals(ledger.getCursors().getSlowestCursorPosition(), lastPosition);
assertEquals(cursor.getProperties(), properties);
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
assertThat(ledger.getCursors().getSlowestCursorPosition()).isGreaterThanOrEqualTo(lastPosition);
// cursor.asyncMarkDelete() and maybeUpdateCursorBeforeTrimmingConsumedLedger may run concurrently,
// so we can't assert properties equals here.
// assertEquals(cursor.getProperties(), properties);

// 3. Add Entry 2. Triggers Rollover.
// 3. Add Entry 2. Triggers second rollover process.
// This implicitly calls maybeUpdateCursorBeforeTrimmingConsumedLedger due to rollover
Position p = ledger.addEntry("entry-2".getBytes(Encoding));

// Wait for background tasks (metadata callback) to complete.
// We expect at least 2 ledgers (Rollover happened).
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> ledger.getLedgersInfo().size() >= 2);
assertEquals(cursor.getPersistentMarkDeletedPosition(), new ImmutablePositionImpl(p.getLedgerId(), -1));
// First ledger is all consumed and trimmed, left current ledger and next empty ledger.
assertEquals(cursor.getPersistentMarkDeletedPosition(), PositionFactory.create(p.getLedgerId(), -1));

// Verify properties are preserved after cursor reset
assertEquals(cursor.getProperties(), properties);
// The same reason as above, can't assert properties equals here.
// assertEquals(cursor.getProperties(), properties);
}
}