Skip to content

Commit b838cd1

Browse files
committed
Fix ManagedLedgerTest.testManagedLedgerWithReadEntryTimeOut
1 parent e9b6a94 commit b838cd1

File tree

1 file changed

+35
-26
lines changed

1 file changed

+35
-26
lines changed

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.apache.bookkeeper.client.LedgerHandle;
9595
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
9696
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
97+
import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
9798
import org.apache.bookkeeper.client.api.LedgerEntries;
9899
import org.apache.bookkeeper.client.api.LedgerMetadata;
99100
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -3133,17 +3134,26 @@ public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
31333134
ManagedLedgerConfig config = new ManagedLedgerConfig().setReadEntryTimeoutSeconds(1);
31343135
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
31353136

3136-
BookKeeper bk = mock(BookKeeper.class);
3137-
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
3137+
Position position = ledger.addEntry("entry-1".getBytes());
3138+
3139+
// ensure that the read isn't cached
3140+
factory.getEntryCacheManager().clear();
3141+
3142+
bkc.setReadHandleInterceptor(new PulsarMockReadHandleInterceptor() {
3143+
@Override
3144+
public CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry,
3145+
LedgerEntries entries) {
3146+
return CompletableFuture.supplyAsync(() -> {
3147+
return entries;
3148+
}, CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
3149+
}
3150+
});
3151+
31383152
AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
31393153
String ctxStr = "timeoutCtx";
3140-
CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
3141-
ReadHandle ledgerHandle = mock(ReadHandle.class);
3142-
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionFactory.EARLIEST.getLedgerId(),
3143-
PositionFactory.EARLIEST.getEntryId());
31443154

31453155
// (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
3146-
ledger.asyncReadEntry(ledgerHandle, PositionFactory.EARLIEST, new ReadEntryCallback() {
3156+
ledger.asyncReadEntry(position, new ReadEntryCallback() {
31473157
@Override
31483158
public void readEntryComplete(Entry entry, Object ctx) {
31493159
responseException1.set(null);
@@ -3155,18 +3165,20 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
31553165
responseException1.set(exception);
31563166
}
31573167
}, ctxStr);
3158-
ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, Collections.emptyMap());
3159-
retryStrategically((test) -> responseException1.get() != null, 5, 1000);
3160-
assertNotNull(responseException1.get());
3161-
assertTrue(responseException1.get().getMessage()
3162-
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
31633168

3164-
// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
3165-
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
3166-
Position readPositionRef = PositionFactory.EARLIEST;
3167-
ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger, "cursor1");
3168-
OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {
3169+
Awaitility.await().untilAsserted(() -> {
3170+
assertNotNull(responseException1.get());
3171+
assertTrue(responseException1.get().getMessage()
3172+
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
3173+
});
31693174

3175+
// ensure that the read isn't cached
3176+
factory.getEntryCacheManager().clear();
3177+
3178+
// (2) test read-timeout for: ManagedCursor.asyncReadEntries(..)
3179+
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
3180+
ManagedCursor cursor = ledger.openCursor("cursor1", InitialPosition.Earliest);
3181+
cursor.asyncReadEntries(1, new ReadEntriesCallback() {
31703182
@Override
31713183
public void readEntriesComplete(List<Entry> entries, Object ctx) {
31723184
}
@@ -3176,16 +3188,13 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
31763188
assertEquals(ctxStr, (String) ctx);
31773189
responseException2.set(exception);
31783190
}
3191+
}, ctxStr, PositionFactory.LATEST);
31793192

3180-
}, null, PositionFactory.LATEST, null);
3181-
ledger.asyncReadEntry(ledgerHandle, PositionFactory.EARLIEST.getEntryId(), PositionFactory.EARLIEST.getEntryId(),
3182-
opReadEntry, ctxStr);
3183-
retryStrategically((test) -> {
3184-
return responseException2.get() != null;
3185-
}, 5, 1000);
3186-
assertNotNull(responseException2.get());
3187-
assertTrue(responseException2.get().getMessage()
3188-
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
3193+
Awaitility.await().untilAsserted(() -> {
3194+
assertNotNull(responseException2.get());
3195+
assertTrue(responseException2.get().getMessage()
3196+
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
3197+
});
31893198

31903199
ledger.close();
31913200
}

0 commit comments

Comments
 (0)