Skip to content

Commit fc981ba

Browse files
authored
Feature: auto recover support repaired not adhering placement ledger (#3359)
1 parent 1adc041 commit fc981ba

16 files changed

+1301
-75
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
568568
bookieQuarantineRatio = 1.0;
569569
}
570570

571-
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
571+
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
572572
DNSToSwitchMapping dnsResolver,
573573
HashedWheelTimer timer,
574574
FeatureProvider featureProvider,

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import java.io.IOException;
3434
import java.util.ArrayList;
3535
import java.util.Collection;
36+
import java.util.Collections;
3637
import java.util.Enumeration;
3738
import java.util.HashMap;
39+
import java.util.HashSet;
3840
import java.util.Iterator;
3941
import java.util.LinkedList;
4042
import java.util.List;
@@ -90,6 +92,7 @@
9092
import org.apache.bookkeeper.stats.StatsLogger;
9193
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
9294
import org.apache.bookkeeper.util.IOUtils;
95+
import org.apache.commons.collections4.MapUtils;
9396
import org.apache.zookeeper.AsyncCallback;
9497
import org.apache.zookeeper.KeeperException;
9598
import org.slf4j.Logger;
@@ -1095,7 +1098,7 @@ private Map<Integer, BookieId> getReplacementBookiesByIndexes(
10951098
oldBookie,
10961099
bookiesToExclude);
10971100
BookieId newBookie = replaceBookieResponse.getResult();
1098-
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
1101+
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
10991102
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
11001103
if (LOG.isDebugEnabled()) {
11011104
LOG.debug(
@@ -1129,14 +1132,23 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
11291132
* @param ledgerFragment
11301133
* - LedgerFragment to replicate
11311134
*/
1132-
public void replicateLedgerFragment(LedgerHandle lh,
1133-
final LedgerFragment ledgerFragment,
1134-
final BiConsumer<Long, Long> onReadEntryFailureCallback)
1135-
throws InterruptedException, BKException {
1136-
Optional<Set<BookieId>> excludedBookies = Optional.empty();
1137-
Map<Integer, BookieId> targetBookieAddresses =
1138-
getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
1139-
ledgerFragment.getBookiesIndexes(), excludedBookies);
1135+
public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
1136+
final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
1137+
Map<Integer, BookieId> targetBookieAddresses = null;
1138+
if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
1139+
Optional<Set<BookieId>> excludedBookies = Optional.empty();
1140+
targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
1141+
ledgerFragment.getBookiesIndexes(), excludedBookies);
1142+
} else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
1143+
targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
1144+
lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize());
1145+
ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet());
1146+
}
1147+
if (MapUtils.isEmpty(targetBookieAddresses)) {
1148+
LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.",
1149+
ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId());
1150+
return;
1151+
}
11401152
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
11411153
}
11421154

@@ -1777,6 +1789,31 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI
17771789
ackQuorumSize);
17781790
}
17791791

1792+
public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList,
1793+
int writeQuorumSize, int ackQuorumSize) {
1794+
try {
1795+
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult = bkc.getPlacementPolicy()
1796+
.replaceToAdherePlacementPolicy(ensembleBookiesList.size(), writeQuorumSize, ackQuorumSize,
1797+
new HashSet<>(), ensembleBookiesList);
1798+
if (PlacementPolicyAdherence.FAIL != placementResult.getAdheringToPolicy()) {
1799+
Map<Integer, BookieId> targetMap = new HashMap<>();
1800+
List<BookieId> newEnsembles = placementResult.getResult();
1801+
for (int i = 0; i < ensembleBookiesList.size(); i++) {
1802+
BookieId originBookie = ensembleBookiesList.get(i);
1803+
BookieId newBookie = newEnsembles.get(i);
1804+
if (!originBookie.equals(newBookie)) {
1805+
targetMap.put(i, newBookie);
1806+
}
1807+
}
1808+
return targetMap;
1809+
}
1810+
} catch (UnsupportedOperationException e) {
1811+
LOG.warn("The placement policy: {} didn't support replaceToAdherePlacementPolicy, "
1812+
+ "ignore replace not adhere bookie.", bkc.getPlacementPolicy().getClass().getName());
1813+
}
1814+
return Collections.emptyMap();
1815+
}
1816+
17801817
/**
17811818
* Makes async request for getting list of entries of ledger from a bookie
17821819
* and returns Future for the result.

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
269269
newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
270270
customMetadata, new HashSet<BookieId>(quarantinedBookiesSet));
271271
socketAddresses = newEnsembleResponse.getResult();
272-
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
272+
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
273273
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
274274
ensembleNotAdheringToPlacementPolicy.inc();
275275
if (ensembleSize > 1) {
@@ -286,7 +286,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
286286
newEnsembleResponse = placementPolicy.newEnsemble(
287287
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
288288
socketAddresses = newEnsembleResponse.getResult();
289-
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
289+
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
290290
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
291291
ensembleNotAdheringToPlacementPolicy.inc();
292292
log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
@@ -317,7 +317,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
317317
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
318318
existingBookies, addr, excludedBookiesAndQuarantinedBookies);
319319
socketAddress = replaceBookieResponse.getResult();
320-
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
320+
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
321321
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
322322
ensembleNotAdheringToPlacementPolicy.inc();
323323
log.warn(
@@ -333,7 +333,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
333333
replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
334334
customMetadata, existingBookies, addr, excludeBookies);
335335
socketAddress = replaceBookieResponse.getResult();
336-
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
336+
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
337337
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
338338
ensembleNotAdheringToPlacementPolicy.inc();
339339
log.warn(

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
440440
return true;
441441
}
442442

443+
/**
444+
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
445+
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
446+
*
447+
* @param ensembleSize
448+
* ensemble size
449+
* @param writeQuorumSize
450+
* writeQuorumSize of the ensemble
451+
* @param ackQuorumSize
452+
* ackQuorumSize of the ensemble
453+
* @param excludeBookies
454+
* bookies that should not be considered as targets
455+
* @param currentEnsemble
456+
* current ensemble
457+
* @return a placement result
458+
*/
459+
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
460+
int ensembleSize,
461+
int writeQuorumSize,
462+
int ackQuorumSize,
463+
Set<BookieId> excludeBookies,
464+
List<BookieId> currentEnsemble) {
465+
throw new UnsupportedOperationException();
466+
}
467+
443468
/**
444469
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
445470
* enum for PlacementPolicyAdherence. If placement policy is met strictly
@@ -481,8 +506,16 @@ public T getResult() {
481506
return result;
482507
}
483508

509+
/**
510+
* Use {@link #getAdheringToPolicy}.
511+
*/
512+
@Deprecated
484513
public PlacementPolicyAdherence isAdheringToPolicy() {
485514
return policyAdherence;
486515
}
516+
517+
public PlacementPolicyAdherence getAdheringToPolicy() {
518+
return policyAdherence;
519+
}
487520
}
488521
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public class LedgerFragment {
3939
private final long ledgerId;
4040
private final DistributionSchedule schedule;
4141
private final boolean isLedgerClosed;
42+
private ReplicateType replicateType = ReplicateType.DATA_LOSS;
4243

43-
LedgerFragment(LedgerHandle lh,
44+
public LedgerFragment(LedgerHandle lh,
4445
long firstEntryId,
4546
long lastKnownEntryId,
4647
Set<Integer> bookieIndexes) {
@@ -56,7 +57,7 @@ public class LedgerFragment {
5657
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
5758
}
5859

59-
LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
60+
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
6061
this.ledgerId = lf.ledgerId;
6162
this.firstEntryId = lf.firstEntryId;
6263
this.lastKnownEntryId = lf.lastKnownEntryId;
@@ -91,7 +92,7 @@ public boolean isClosed() {
9192
return isLedgerClosed;
9293
}
9394

94-
long getLedgerId() {
95+
public long getLedgerId() {
9596
return ledgerId;
9697
}
9798

@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
217218
return this.ensemble;
218219
}
219220

221+
public ReplicateType getReplicateType() {
222+
return replicateType;
223+
}
224+
225+
public void setReplicateType(ReplicateType replicateType) {
226+
this.replicateType = replicateType;
227+
}
228+
220229
@Override
221230
public String toString() {
222231
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
223232
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
224233
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
225234
getAddresses(), isLedgerClosed);
226235
}
236+
237+
/**
238+
* ReplicateType.
239+
*/
240+
public enum ReplicateType {
241+
DATA_LOSS,
242+
DATA_NOT_ADHERING_PLACEMENT
243+
}
227244
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,28 @@ public BookieNode selectFromNetworkLocation(
235235
}
236236
}
237237

238+
@Override
239+
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
240+
int ensembleSize,
241+
int writeQuorumSize,
242+
int ackQuorumSize,
243+
Set<BookieId> excludeBookies,
244+
List<BookieId> currentEnsemble) {
245+
final PlacementResult<List<BookieId>> placementResult =
246+
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
247+
excludeBookies, currentEnsemble);
248+
if (placementResult.getAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
249+
return placementResult;
250+
} else {
251+
if (slave == null) {
252+
return placementResult;
253+
} else {
254+
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
255+
excludeBookies, currentEnsemble);
256+
}
257+
}
258+
}
259+
238260
@Override
239261
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
240262
super.handleBookiesThatLeft(leftBookies);

0 commit comments

Comments
 (0)