Skip to content

Commit 6363f8c

Browse files
horizonzyhangc
authored andcommitted
Feature: auto recover support repaired not adhering placement ledger (#3359)
(cherry picked from commit fc981ba)
1 parent c3a9943 commit 6363f8c

16 files changed

+1301
-75
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
571571
bookieQuarantineRatio = 1.0;
572572
}
573573

574-
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
574+
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
575575
DNSToSwitchMapping dnsResolver,
576576
HashedWheelTimer timer,
577577
FeatureProvider featureProvider,

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.Collection;
35+
import java.util.Collections;
3536
import java.util.Enumeration;
3637
import java.util.HashMap;
38+
import java.util.HashSet;
3739
import java.util.Iterator;
3840
import java.util.LinkedList;
3941
import java.util.List;
@@ -89,6 +91,7 @@
8991
import org.apache.bookkeeper.stats.StatsLogger;
9092
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
9193
import org.apache.bookkeeper.util.IOUtils;
94+
import org.apache.commons.collections4.MapUtils;
9295
import org.apache.zookeeper.AsyncCallback;
9396
import org.apache.zookeeper.KeeperException;
9497
import org.slf4j.Logger;
@@ -1094,7 +1097,7 @@ private Map<Integer, BookieId> getReplacementBookiesByIndexes(
10941097
oldBookie,
10951098
bookiesToExclude);
10961099
BookieId newBookie = replaceBookieResponse.getResult();
1097-
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
1100+
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
10981101
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
10991102
if (LOG.isDebugEnabled()) {
11001103
LOG.debug(
@@ -1128,14 +1131,23 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
11281131
* @param ledgerFragment
11291132
* - LedgerFragment to replicate
11301133
*/
1131-
public void replicateLedgerFragment(LedgerHandle lh,
1132-
final LedgerFragment ledgerFragment,
1133-
final BiConsumer<Long, Long> onReadEntryFailureCallback)
1134-
throws InterruptedException, BKException {
1135-
Optional<Set<BookieId>> excludedBookies = Optional.empty();
1136-
Map<Integer, BookieId> targetBookieAddresses =
1137-
getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
1138-
ledgerFragment.getBookiesIndexes(), excludedBookies);
1134+
public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
1135+
final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
1136+
Map<Integer, BookieId> targetBookieAddresses = null;
1137+
if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
1138+
Optional<Set<BookieId>> excludedBookies = Optional.empty();
1139+
targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
1140+
ledgerFragment.getBookiesIndexes(), excludedBookies);
1141+
} else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
1142+
targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
1143+
lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize());
1144+
ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet());
1145+
}
1146+
if (MapUtils.isEmpty(targetBookieAddresses)) {
1147+
LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.",
1148+
ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId());
1149+
return;
1150+
}
11391151
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
11401152
}
11411153

@@ -1776,6 +1788,31 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI
17761788
ackQuorumSize);
17771789
}
17781790

1791+
public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList,
1792+
int writeQuorumSize, int ackQuorumSize) {
1793+
try {
1794+
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult = bkc.getPlacementPolicy()
1795+
.replaceToAdherePlacementPolicy(ensembleBookiesList.size(), writeQuorumSize, ackQuorumSize,
1796+
new HashSet<>(), ensembleBookiesList);
1797+
if (PlacementPolicyAdherence.FAIL != placementResult.getAdheringToPolicy()) {
1798+
Map<Integer, BookieId> targetMap = new HashMap<>();
1799+
List<BookieId> newEnsembles = placementResult.getResult();
1800+
for (int i = 0; i < ensembleBookiesList.size(); i++) {
1801+
BookieId originBookie = ensembleBookiesList.get(i);
1802+
BookieId newBookie = newEnsembles.get(i);
1803+
if (!originBookie.equals(newBookie)) {
1804+
targetMap.put(i, newBookie);
1805+
}
1806+
}
1807+
return targetMap;
1808+
}
1809+
} catch (UnsupportedOperationException e) {
1810+
LOG.warn("The placement policy: {} didn't support replaceToAdherePlacementPolicy, "
1811+
+ "ignore replace not adhere bookie.", bkc.getPlacementPolicy().getClass().getName());
1812+
}
1813+
return Collections.emptyMap();
1814+
}
1815+
17791816
/**
17801817
* Makes async request for getting list of entries of ledger from a bookie
17811818
* and returns Future for the result.

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
270270
newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
271271
customMetadata, new HashSet<BookieId>(quarantinedBookiesSet));
272272
socketAddresses = newEnsembleResponse.getResult();
273-
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
273+
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
274274
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
275275
ensembleNotAdheringToPlacementPolicy.inc();
276276
if (ensembleSize > 1) {
@@ -287,7 +287,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
287287
newEnsembleResponse = placementPolicy.newEnsemble(
288288
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
289289
socketAddresses = newEnsembleResponse.getResult();
290-
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
290+
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
291291
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
292292
ensembleNotAdheringToPlacementPolicy.inc();
293293
log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
@@ -318,7 +318,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
318318
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
319319
existingBookies, addr, excludedBookiesAndQuarantinedBookies);
320320
socketAddress = replaceBookieResponse.getResult();
321-
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
321+
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
322322
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
323323
ensembleNotAdheringToPlacementPolicy.inc();
324324
log.warn(
@@ -334,7 +334,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
334334
replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
335335
customMetadata, existingBookies, addr, excludeBookies);
336336
socketAddress = replaceBookieResponse.getResult();
337-
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
337+
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
338338
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
339339
ensembleNotAdheringToPlacementPolicy.inc();
340340
log.warn(

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
441441
return true;
442442
}
443443

444+
/**
445+
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
446+
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
447+
*
448+
* @param ensembleSize
449+
* ensemble size
450+
* @param writeQuorumSize
451+
* writeQuorumSize of the ensemble
452+
* @param ackQuorumSize
453+
* ackQuorumSize of the ensemble
454+
* @param excludeBookies
455+
* bookies that should not be considered as targets
456+
* @param currentEnsemble
457+
* current ensemble
458+
* @return a placement result
459+
*/
460+
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
461+
int ensembleSize,
462+
int writeQuorumSize,
463+
int ackQuorumSize,
464+
Set<BookieId> excludeBookies,
465+
List<BookieId> currentEnsemble) {
466+
throw new UnsupportedOperationException();
467+
}
468+
444469
/**
445470
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
446471
* enum for PlacementPolicyAdherence. If placement policy is met strictly
@@ -482,8 +507,16 @@ public T getResult() {
482507
return result;
483508
}
484509

510+
/**
511+
* Use {@link #getAdheringToPolicy}.
512+
*/
513+
@Deprecated
485514
public PlacementPolicyAdherence isAdheringToPolicy() {
486515
return policyAdherence;
487516
}
517+
518+
public PlacementPolicyAdherence getAdheringToPolicy() {
519+
return policyAdherence;
520+
}
488521
}
489522
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public class LedgerFragment {
3939
private final long ledgerId;
4040
private final DistributionSchedule schedule;
4141
private final boolean isLedgerClosed;
42+
private ReplicateType replicateType = ReplicateType.DATA_LOSS;
4243

43-
LedgerFragment(LedgerHandle lh,
44+
public LedgerFragment(LedgerHandle lh,
4445
long firstEntryId,
4546
long lastKnownEntryId,
4647
Set<Integer> bookieIndexes) {
@@ -56,7 +57,7 @@ public class LedgerFragment {
5657
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
5758
}
5859

59-
LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
60+
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
6061
this.ledgerId = lf.ledgerId;
6162
this.firstEntryId = lf.firstEntryId;
6263
this.lastKnownEntryId = lf.lastKnownEntryId;
@@ -91,7 +92,7 @@ public boolean isClosed() {
9192
return isLedgerClosed;
9293
}
9394

94-
long getLedgerId() {
95+
public long getLedgerId() {
9596
return ledgerId;
9697
}
9798

@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
217218
return this.ensemble;
218219
}
219220

221+
public ReplicateType getReplicateType() {
222+
return replicateType;
223+
}
224+
225+
public void setReplicateType(ReplicateType replicateType) {
226+
this.replicateType = replicateType;
227+
}
228+
220229
@Override
221230
public String toString() {
222231
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
223232
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
224233
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
225234
getAddresses(), isLedgerClosed);
226235
}
236+
237+
/**
238+
* ReplicateType.
239+
*/
240+
public enum ReplicateType {
241+
DATA_LOSS,
242+
DATA_NOT_ADHERING_PLACEMENT
243+
}
227244
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,28 @@ public BookieNode selectFromNetworkLocation(
236236
}
237237
}
238238

239+
@Override
240+
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
241+
int ensembleSize,
242+
int writeQuorumSize,
243+
int ackQuorumSize,
244+
Set<BookieId> excludeBookies,
245+
List<BookieId> currentEnsemble) {
246+
final PlacementResult<List<BookieId>> placementResult =
247+
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
248+
excludeBookies, currentEnsemble);
249+
if (placementResult.getAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
250+
return placementResult;
251+
} else {
252+
if (slave == null) {
253+
return placementResult;
254+
} else {
255+
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
256+
excludeBookies, currentEnsemble);
257+
}
258+
}
259+
}
260+
239261
@Override
240262
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
241263
super.handleBookiesThatLeft(leftBookies);

0 commit comments

Comments
 (0)