Skip to content

Commit 4a50fc0

Browse files
committed
feat: add relocate placement command
1 parent e4a2b54 commit 4a50fc0

File tree

9 files changed

+610
-10
lines changed

9 files changed

+610
-10
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
7272
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
7373
import org.apache.bookkeeper.tools.cli.commands.bookie.UpdateBookieInLedgerCommand;
74+
import org.apache.bookkeeper.tools.cli.commands.bookies.CorrectEnsemblePlacementCommand;
7475
import org.apache.bookkeeper.tools.cli.commands.bookies.DecommissionCommand;
7576
import org.apache.bookkeeper.tools.cli.commands.bookies.EndpointInfoCommand;
7677
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
@@ -160,6 +161,7 @@ public class BookieShell implements Tool {
160161
static final String CMD_CHECK_DB_LEDGERS_INDEX = "check-db-ledgers-index";
161162
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file";
162163
static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
164+
static final String CMD_CORRECT_ENSEMBLE_PLACEMENT = "correct-ensemble-placement";
163165

164166
// cookie commands
165167
static final String CMD_CREATE_COOKIE = "cookie_create";
@@ -2257,6 +2259,58 @@ int runCmd(CommandLine cmdLine) throws Exception {
22572259
}
22582260
}
22592261

2262+
class CorrectEnsemblePlacementCmd extends MyCommand {
2263+
final Options opts = new Options();
2264+
2265+
public CorrectEnsemblePlacementCmd() {
2266+
super(CMD_CORRECT_ENSEMBLE_PLACEMENT);
2267+
Option ledgerOption = new Option("l", "ledgerids", true,
2268+
"Target ledger IDs to relocate."
2269+
+ " Multiple can be specified, comma separated.");
2270+
ledgerOption.setRequired(true);
2271+
ledgerOption.setValueSeparator(',');
2272+
ledgerOption.setArgs(Option.UNLIMITED_VALUES);
2273+
2274+
opts.addOption(ledgerOption);
2275+
opts.addOption("dr", "dryrun", false,
2276+
"Printing the relocation plan w/o doing actual relocation");
2277+
opts.addOption("f", "force", false,
2278+
"Force relocation without confirmation");
2279+
opts.addOption("sk", "skipOpenLedgers", false,
2280+
"Skip relocating open ledgers");
2281+
}
2282+
2283+
@Override
2284+
Options getOptions() {
2285+
return opts;
2286+
}
2287+
2288+
@Override
2289+
String getDescription() {
2290+
return "Relocate ledgers to adhere ensemble placement policy.";
2291+
}
2292+
2293+
@Override
2294+
String getUsage() {
2295+
return CMD_CORRECT_ENSEMBLE_PLACEMENT
2296+
+ " --ledgerids <ledgerId[,ledgerId,...]> [--dryrun] [--force] [--skipOpenLedgers]";
2297+
}
2298+
2299+
@Override
2300+
int runCmd(CommandLine cmdLine) throws Exception {
2301+
final CorrectEnsemblePlacementCommand cmd = new CorrectEnsemblePlacementCommand();
2302+
final CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags
2303+
flags = new CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags();
2304+
final List<Long> ledgerIds = Arrays.stream(cmdLine.getOptionValues("ledgerids")).map(Long::parseLong)
2305+
.collect(Collectors.toList());
2306+
flags.ledgerIds(ledgerIds);
2307+
flags.dryRun(cmdLine.hasOption("dryrun"));
2308+
flags.force(cmdLine.hasOption("force"));
2309+
flags.skipOpenLedgers(cmdLine.hasOption("skipOpenLedgers"));
2310+
return cmd.apply(bkConf, flags) ? 0 : 1;
2311+
}
2312+
}
2313+
22602314
final Map<String, Command> commands = new HashMap<>();
22612315

22622316
{
@@ -2302,6 +2356,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
23022356
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
23032357
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
23042358
commands.put(CMD_FORCEAUDITCHECKS, new ForceAuditorChecksCmd());
2359+
commands.put(CMD_CORRECT_ENSEMBLE_PLACEMENT, new CorrectEnsemblePlacementCmd());
23052360
// cookie related commands
23062361
commands.put(CMD_CREATE_COOKIE,
23072362
new CreateCookieCommand().asShellCommand(CMD_CREATE_COOKIE, bkConf));

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,8 +696,7 @@ OrderedScheduler getScheduler() {
696696
return scheduler;
697697
}
698698

699-
@VisibleForTesting
700-
EnsemblePlacementPolicy getPlacementPolicy() {
699+
public EnsemblePlacementPolicy getPlacementPolicy() {
701700
return placementPolicy;
702701
}
703702

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ public void replicateLedgerFragment(LedgerHandle lh,
11371137
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
11381138
}
11391139

1140-
private void replicateLedgerFragment(LedgerHandle lh,
1140+
public void replicateLedgerFragment(LedgerHandle lh,
11411141
final LedgerFragment ledgerFragment,
11421142
final Map<Integer, BookieId> targetBookieAddresses,
11431143
final BiConsumer<Long, Long> onReadEntryFailureCallback)

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
441441
return true;
442442
}
443443

444+
/**
445+
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
446+
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
447+
*
448+
* @param ensembleSize
449+
* ensemble size
450+
* @param writeQuorumSize
451+
* writeQuorumSize of the ensemble
452+
* @param ackQuorumSize
453+
* ackQuorumSize of the ensemble
454+
* @param excludeBookies
455+
* bookies that should not be considered as targets
456+
* @param currentEnsemble
457+
* current ensemble
458+
* @return a placement result
459+
*/
460+
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
461+
int ensembleSize,
462+
int writeQuorumSize,
463+
int ackQuorumSize,
464+
Set<BookieId> excludeBookies,
465+
List<BookieId> currentEnsemble) {
466+
throw new UnsupportedOperationException();
467+
}
468+
444469
/**
445470
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
446471
* enum for PlacementPolicyAdherence. If placement policy is met strictly

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class LedgerFragment {
4040
private final DistributionSchedule schedule;
4141
private final boolean isLedgerClosed;
4242

43-
LedgerFragment(LedgerHandle lh,
43+
public LedgerFragment(LedgerHandle lh,
4444
long firstEntryId,
4545
long lastKnownEntryId,
4646
Set<Integer> bookieIndexes) {
@@ -56,7 +56,7 @@ public class LedgerFragment {
5656
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
5757
}
5858

59-
LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
59+
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
6060
this.ledgerId = lf.ledgerId;
6161
this.firstEntryId = lf.firstEntryId;
6262
this.lastKnownEntryId = lf.lastKnownEntryId;

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,28 @@ public BookieNode selectFromNetworkLocation(
236236
}
237237
}
238238

239+
@Override
240+
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
241+
int ensembleSize,
242+
int writeQuorumSize,
243+
int ackQuorumSize,
244+
Set<BookieId> excludeBookies,
245+
List<BookieId> currentEnsemble) {
246+
final PlacementResult<List<BookieId>> placementResult =
247+
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
248+
excludeBookies, currentEnsemble);
249+
if (placementResult.isAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
250+
return placementResult;
251+
} else {
252+
if (slave == null) {
253+
return placementResult;
254+
} else {
255+
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
256+
excludeBookies, currentEnsemble);
257+
}
258+
}
259+
}
260+
239261
@Override
240262
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
241263
super.handleBookiesThatLeft(leftBookies);

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.concurrent.TimeUnit;
4747

4848
import java.util.concurrent.locks.ReentrantReadWriteLock;
49+
import java.util.stream.Collectors;
50+
import java.util.stream.Stream;
4951
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
5052
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
5153
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
@@ -69,6 +71,7 @@
6971
import org.apache.bookkeeper.stats.StatsLogger;
7072
import org.apache.bookkeeper.stats.annotations.StatsDoc;
7173
import org.apache.commons.collections4.CollectionUtils;
74+
import org.apache.commons.lang3.tuple.Pair;
7275
import org.slf4j.Logger;
7376
import org.slf4j.LoggerFactory;
7477

@@ -1071,4 +1074,175 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
10711074
}
10721075
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
10731076
}
1077+
1078+
@Override
1079+
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
1080+
int ensembleSize,
1081+
int writeQuorumSize,
1082+
int ackQuorumSize,
1083+
Set<BookieId> excludeBookies,
1084+
List<BookieId> currentEnsemble) {
1085+
rwLock.readLock().lock();
1086+
try {
1087+
final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
1088+
.map(this::convertBookieToNode).collect(Collectors.toList());
1089+
final Set<Node> excludeNodes = convertBookiesToNodes(
1090+
addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
1091+
int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
1092+
final RRTopologyAwareCoverageEnsemble ensemble =
1093+
new RRTopologyAwareCoverageEnsemble(
1094+
ensembleSize,
1095+
writeQuorumSize,
1096+
ackQuorumSize,
1097+
RACKNAME_DISTANCE_FROM_LEAVES,
1098+
null,
1099+
null,
1100+
minNumRacksPerWriteQuorumForThisEnsemble);
1101+
1102+
int numRacks = topology.getNumOfRacks();
1103+
// only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
1104+
if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
1105+
LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
1106+
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
1107+
}
1108+
1109+
BookieNode prevNode = null;
1110+
final BookieNode firstNode = provisionalEnsembleNodes.get(0);
1111+
// use same bookie at first to reduce ledger replication
1112+
if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
1113+
&& ensemble.addNode(firstNode)) {
1114+
excludeNodes.add(firstNode);
1115+
prevNode = firstNode;
1116+
}
1117+
1118+
for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
1119+
final String curRack;
1120+
if (null == prevNode) {
1121+
if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
1122+
curRack = NodeBase.ROOT;
1123+
} else {
1124+
curRack = localNode.getNetworkLocation();
1125+
}
1126+
} else {
1127+
curRack = "~" + prevNode.getNetworkLocation();
1128+
}
1129+
1130+
try {
1131+
prevNode = replaceToAdherePlacementPolicyInternal(
1132+
curRack, excludeNodes, ensemble, ensemble,
1133+
provisionalEnsembleNodes, i, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
1134+
// replace to newer node
1135+
provisionalEnsembleNodes.set(i, prevNode);
1136+
} catch (BKNotEnoughBookiesException e) {
1137+
LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
1138+
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
1139+
}
1140+
}
1141+
List<BookieId> bookieList = ensemble.toList();
1142+
if (ensembleSize != bookieList.size()) {
1143+
LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
1144+
ensembleSize, bookieList);
1145+
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
1146+
}
1147+
return PlacementResult.of(bookieList,
1148+
isEnsembleAdheringToPlacementPolicy(
1149+
bookieList, writeQuorumSize, ackQuorumSize));
1150+
} finally {
1151+
rwLock.readLock().unlock();
1152+
}
1153+
}
1154+
1155+
private BookieNode replaceToAdherePlacementPolicyInternal(
1156+
String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
1157+
Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
1158+
int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
1159+
final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
1160+
// if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
1161+
if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
1162+
if (ensemble.addNode(currentNode)) {
1163+
// add the candidate to exclude set
1164+
excludeBookies.add(currentNode);
1165+
}
1166+
return currentNode;
1167+
}
1168+
1169+
final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
1170+
final Set<String> preExcludeRacks = new HashSet<>();
1171+
final Set<String> postExcludeRacks = new HashSet<>();
1172+
for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
1173+
preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
1174+
.getNetworkLocation());
1175+
postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
1176+
.getNetworkLocation());
1177+
}
1178+
// adhere minNumRacksPerWriteQuorum by preExcludeRacks
1179+
// avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
1180+
// avoid to use first candidate bookies for election by provisionalEnsembleNodes
1181+
conditionList.add(Pair.of(
1182+
"~" + String.join(",",
1183+
Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
1184+
provisionalEnsembleNodes
1185+
));
1186+
// avoid to use same rack between previous index by netPath
1187+
// avoid to use first candidate bookies for election by provisionalEnsembleNodes
1188+
conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
1189+
// avoid to use same rack between previous index by netPath
1190+
conditionList.add(Pair.of(netPath, Collections.emptyList()));
1191+
1192+
for (Pair<String, List<BookieNode>> condition : conditionList) {
1193+
WeightedRandomSelection<BookieNode> wRSelection = null;
1194+
1195+
final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
1196+
if (!isWeighted) {
1197+
Collections.shuffle(leaves);
1198+
} else {
1199+
if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
1200+
throw new BKNotEnoughBookiesException();
1201+
}
1202+
wRSelection = prepareForWeightedSelection(leaves);
1203+
if (wRSelection == null) {
1204+
throw new BKNotEnoughBookiesException();
1205+
}
1206+
}
1207+
1208+
final Iterator<Node> it = leaves.iterator();
1209+
final Set<Node> bookiesSeenSoFar = new HashSet<>();
1210+
while (true) {
1211+
Node n;
1212+
if (isWeighted) {
1213+
if (bookiesSeenSoFar.size() == leaves.size()) {
1214+
// Don't loop infinitely.
1215+
break;
1216+
}
1217+
n = wRSelection.getNextRandom();
1218+
bookiesSeenSoFar.add(n);
1219+
} else {
1220+
if (it.hasNext()) {
1221+
n = it.next();
1222+
} else {
1223+
break;
1224+
}
1225+
}
1226+
if (excludeBookies.contains(n)) {
1227+
continue;
1228+
}
1229+
if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
1230+
continue;
1231+
}
1232+
// additional excludeBookies
1233+
if (condition.getRight().contains(n)) {
1234+
continue;
1235+
}
1236+
BookieNode bn = (BookieNode) n;
1237+
// got a good candidate
1238+
if (ensemble.addNode(bn)) {
1239+
// add the candidate to exclude set
1240+
excludeBookies.add(bn);
1241+
}
1242+
return bn;
1243+
}
1244+
}
1245+
1246+
throw new BKNotEnoughBookiesException();
1247+
}
10741248
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -802,12 +802,16 @@ protected String resolveNetworkLocation(BookieId addr) {
802802
protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) {
803803
Set<Node> nodes = new HashSet<Node>();
804804
for (BookieId addr : excludeBookies) {
805-
BookieNode bn = knownBookies.get(addr);
806-
if (null == bn) {
807-
bn = createBookieNode(addr);
808-
}
809-
nodes.add(bn);
805+
nodes.add(convertBookieToNode(addr));
810806
}
811807
return nodes;
812808
}
809+
810+
protected BookieNode convertBookieToNode(BookieId addr) {
811+
BookieNode bn = knownBookies.get(addr);
812+
if (null == bn) {
813+
bn = createBookieNode(addr);
814+
}
815+
return bn;
816+
}
813817
}

0 commit comments

Comments
 (0)