Skip to content

Commit 30e25e8

Browse files
lhotarinikhil-ctds
authored andcommitted
[improve][test] Add solution to PulsarMockBookKeeper for intercepting reads (apache#23875)
(cherry picked from commit 87fb442) (cherry picked from commit 38f15bc)
1 parent 69e638a commit 30e25e8

File tree

4 files changed

+68
-13
lines changed

4 files changed

+68
-13
lines changed

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.concurrent.ScheduledExecutorService;
4141
import java.util.concurrent.TimeUnit;
4242
import java.util.concurrent.atomic.AtomicLong;
43+
import lombok.Getter;
44+
import lombok.Setter;
4345
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
4446
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
4547
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -96,6 +98,9 @@ public static Collection<BookieId> getMockEnsemble() {
9698
final Queue<Long> addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>();
9799
final List<CompletableFuture<Void>> failures = new ArrayList<>();
98100
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
101+
@Setter
102+
@Getter
103+
private volatile PulsarMockReadHandleInterceptor readHandleInterceptor;
99104

100105
public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
101106
this.orderedExecutor = orderedExecutor;
@@ -250,7 +255,8 @@ public CompletableFuture<ReadHandle> execute() {
250255
return FutureUtils.exception(new BKException.BKUnauthorizedAccessException());
251256
} else {
252257
return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
253-
lh.getLedgerMetadata(), lh.entries));
258+
lh.getLedgerMetadata(), lh.entries,
259+
PulsarMockBookKeeper.this::getReadHandleInterceptor));
254260
}
255261
});
256262
}

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
7373
this.digest = digest;
7474
this.passwd = Arrays.copyOf(passwd, passwd.length);
7575

76-
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries);
76+
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor);
7777
}
7878

7979
@Override

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.function.Supplier;
2425
import lombok.extern.slf4j.Slf4j;
2526
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
2627
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle {
4041
private final long ledgerId;
4142
private final LedgerMetadata metadata;
4243
private final List<LedgerEntryImpl> entries;
44+
private final Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier;
4345

4446
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata,
45-
List<LedgerEntryImpl> entries) {
47+
List<LedgerEntryImpl> entries,
48+
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier) {
4649
this.bk = bk;
4750
this.ledgerId = ledgerId;
4851
this.metadata = metadata;
4952
this.entries = entries;
53+
this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
5054
}
5155

5256
@Override
5357
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
5458
return bk.getProgrammedFailure().thenComposeAsync((res) -> {
55-
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
56-
List<LedgerEntry> seq = new ArrayList<>();
57-
long entryId = firstEntry;
58-
while (entryId <= lastEntry && entryId < entries.size()) {
59-
seq.add(entries.get((int) entryId++).duplicate());
60-
}
61-
log.debug("Entries read: {}", seq);
62-
63-
return FutureUtils.value(LedgerEntriesImpl.create(seq));
64-
});
59+
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
60+
List<LedgerEntry> seq = new ArrayList<>();
61+
long entryId = firstEntry;
62+
while (entryId <= lastEntry && entryId < entries.size()) {
63+
seq.add(entries.get((int) entryId++).duplicate());
64+
}
65+
log.debug("Entries read: {}", seq);
66+
LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq);
67+
PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get();
68+
if (pulsarMockReadHandleInterceptor != null) {
69+
return pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, lastEntry,
70+
ledgerEntries);
71+
}
72+
return FutureUtils.value(ledgerEntries);
73+
});
6574
}
6675

6776
@Override
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.client;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import org.apache.bookkeeper.client.api.LedgerEntries;
23+
24+
/**
25+
* Interceptor interface for intercepting read handle readAsync operations.
26+
* This is useful for testing purposes, for example for introducing delays.
27+
*/
28+
public interface PulsarMockReadHandleInterceptor {
29+
/**
30+
* Intercepts the readAsync operation on a read handle.
31+
*
32+
* @param ledgerId ledger id
33+
* @param firstEntry first entry to read
34+
* @param lastEntry last entry to read
35+
* @param entries entries that would be returned by the read operation
36+
* @return CompletableFuture that will complete with the entries to return
37+
*/
38+
CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry,
39+
LedgerEntries entries);
40+
}

0 commit comments

Comments
 (0)