Skip to content

Commit b445bfc

Browse files
ange-klhotari
authored andcommitted
Optimize reorderReadSequence to check WriteSet instead of entire Ensemble (#4478)
(cherry picked from commit 0376bdc)
1 parent e5a119d commit b445bfc

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,9 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
865865
if (useRegionAware || reorderReadsRandom) {
866866
isAnyBookieUnavailable = true;
867867
} else {
868-
for (int i = 0; i < ensemble.size(); i++) {
869-
BookieId bookieAddr = ensemble.get(i);
868+
for (int i = 0; i < writeSet.size(); i++) {
869+
int idx = writeSet.get(i);
870+
BookieId bookieAddr = ensemble.get(idx);
870871
if ((!knownBookies.containsKey(bookieAddr) && !readOnlyBookies.contains(bookieAddr))
871872
|| slowBookies.getIfPresent(bookieAddr) != null) {
872873
// Found at least one bookie not available in the ensemble, or in slowBookies

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.bookkeeper.stats.Gauge;
6161
import org.apache.bookkeeper.stats.NullStatsLogger;
6262
import org.apache.bookkeeper.test.TestStatsProvider;
63+
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
6364
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
6465
import org.apache.bookkeeper.util.StaticDNSResolver;
6566
import org.apache.commons.collections4.CollectionUtils;
@@ -2267,6 +2268,44 @@ public void testNodeWithFailures() throws Exception {
22672268
StaticDNSResolver.reset();
22682269
}
22692270

2271+
@Test
2272+
public void testSlowBookieInEnsembleOnly() throws Exception {
2273+
repp.uninitalize();
2274+
updateMyRack("/r1/rack1");
2275+
2276+
TestStatsProvider statsProvider = new TestStatsProvider();
2277+
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
2278+
2279+
repp = new RackawareEnsemblePlacementPolicy();
2280+
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
2281+
DISABLE_ALL, statsLogger, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
2282+
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
2283+
2284+
TestOpStatsLogger readRequestsReorderedCounter = (TestOpStatsLogger) statsLogger
2285+
.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
2286+
2287+
// Update cluster
2288+
Set<BookieId> addrs = new HashSet<BookieId>();
2289+
addrs.add(addr1.toBookieId());
2290+
addrs.add(addr2.toBookieId());
2291+
addrs.add(addr3.toBookieId());
2292+
addrs.add(addr4.toBookieId());
2293+
repp.onClusterChanged(addrs, new HashSet<BookieId>());
2294+
repp.registerSlowBookie(addr1.toBookieId(), 0L);
2295+
Map<BookieId, Long> bookiePendingMap = new HashMap<>();
2296+
bookiePendingMap.put(addr1.toBookieId(), 1L);
2297+
repp.onClusterChanged(addrs, new HashSet<>());
2298+
2299+
DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 2, 3);
2300+
2301+
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
2302+
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
2303+
2304+
// If the slow bookie is only present in the ensemble, no reordering occurs.
2305+
assertEquals(writeSet, reorderSet);
2306+
assertEquals(0, readRequestsReorderedCounter.getSuccessCount());
2307+
}
2308+
22702309
@Test
22712310
public void testReplaceNotAvailableBookieWithDefaultRack() throws Exception {
22722311
repp.uninitalize();

0 commit comments

Comments
 (0)