Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.UpdateBookieInLedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.CorrectEnsemblePlacementCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.DecommissionCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.EndpointInfoCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
Expand Down Expand Up @@ -160,6 +161,7 @@ public class BookieShell implements Tool {
static final String CMD_CHECK_DB_LEDGERS_INDEX = "check-db-ledgers-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file";
static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
static final String CMD_CORRECT_ENSEMBLE_PLACEMENT = "correct-ensemble-placement";

// cookie commands
static final String CMD_CREATE_COOKIE = "cookie_create";
Expand Down Expand Up @@ -2257,6 +2259,58 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}

class CorrectEnsemblePlacementCmd extends MyCommand {
final Options opts = new Options();

public CorrectEnsemblePlacementCmd() {
super(CMD_CORRECT_ENSEMBLE_PLACEMENT);
Option ledgerOption = new Option("l", "ledgerids", true,
"Target ledger IDs to relocate."
+ " Multiple can be specified, comma separated.");
ledgerOption.setRequired(true);
ledgerOption.setValueSeparator(',');
ledgerOption.setArgs(Option.UNLIMITED_VALUES);

opts.addOption(ledgerOption);
opts.addOption("dr", "dryrun", false,
"Printing the relocation plan w/o doing actual relocation");
opts.addOption("f", "force", false,
"Force relocation without confirmation");
opts.addOption("sk", "skipOpenLedgers", false,
"Skip relocating open ledgers");
}

@Override
Options getOptions() {
return opts;
}

@Override
String getDescription() {
return "Relocate ledgers to adhere ensemble placement policy.";
}

@Override
String getUsage() {
return CMD_CORRECT_ENSEMBLE_PLACEMENT
+ " --ledgerids <ledgerId[,ledgerId,...]> [--dryrun] [--force] [--skipOpenLedgers]";
}

@Override
int runCmd(CommandLine cmdLine) throws Exception {
final CorrectEnsemblePlacementCommand cmd = new CorrectEnsemblePlacementCommand();
final CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags
flags = new CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags();
final List<Long> ledgerIds = Arrays.stream(cmdLine.getOptionValues("ledgerids")).map(Long::parseLong)
.collect(Collectors.toList());
flags.ledgerIds(ledgerIds);
flags.dryRun(cmdLine.hasOption("dryrun"));
flags.force(cmdLine.hasOption("force"));
flags.skipOpenLedgers(cmdLine.hasOption("skipOpenLedgers"));
return cmd.apply(bkConf, flags) ? 0 : 1;
}
}

final Map<String, Command> commands = new HashMap<>();

{
Expand Down Expand Up @@ -2302,6 +2356,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
commands.put(CMD_FORCEAUDITCHECKS, new ForceAuditorChecksCmd());
commands.put(CMD_CORRECT_ENSEMBLE_PLACEMENT, new CorrectEnsemblePlacementCmd());
// cookie related commands
commands.put(CMD_CREATE_COOKIE,
new CreateCookieCommand().asShellCommand(CMD_CREATE_COOKIE, bkConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,7 @@ OrderedScheduler getScheduler() {
return scheduler;
}

@VisibleForTesting
EnsemblePlacementPolicy getPlacementPolicy() {
public EnsemblePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ public void replicateLedgerFragment(LedgerHandle lh,
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
}

private void replicateLedgerFragment(LedgerHandle lh,
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final Map<Integer, BookieId> targetBookieAddresses,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
return true;
}

/**
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
*
* @param ensembleSize
* ensemble size
* @param writeQuorumSize
* writeQuorumSize of the ensemble
* @param ackQuorumSize
* ackQuorumSize of the ensemble
* @param excludeBookies
* bookies that should not be considered as targets
* @param currentEnsemble
* current ensemble
* @return a placement result
*/
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
throw new UnsupportedOperationException();
}

/**
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
* enum for PlacementPolicyAdherence. If placement policy is met strictly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LedgerFragment {
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;

LedgerFragment(LedgerHandle lh,
public LedgerFragment(LedgerHandle lh,
long firstEntryId,
long lastKnownEntryId,
Set<Integer> bookieIndexes) {
Expand All @@ -56,7 +56,7 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}

LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
this.ledgerId = lf.ledgerId;
this.firstEntryId = lf.firstEntryId;
this.lastKnownEntryId = lf.lastKnownEntryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,28 @@ public BookieNode selectFromNetworkLocation(
}
}

@Override
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some case, the replace policy will replace more bookie. There are two racks.(rack1, rack2).
rack1: bookie1,bookie2,bookie3,bookie4
rack2: bookie5,bookie6,bookie7,bookie8
E:6 WQ:2 AQ:2. The ensemble is (5,6,1,7,2,8). (rack2,rack2,rack1,rack2,rack1,rack2), only replace the first bookie5 to bookie3, it adhere the placement policy. But now the implements didn't replace the firstly, it will replace from second element(bookie6), it will replace more bookie.

Copy link
Contributor Author

@equanz equanz Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. We can't "minimize" replacement bookies in some cases by the current approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can list the kinds of replace result, and pick the "minimize" result. We can pick the different start index to replace. such as start with index0, start with index1, .... Finally, we pick the "minimize" replace result.

Copy link
Contributor Author

@equanz equanz Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can list the kinds of replace result, and pick the "minimize" result.

Yeah.
I think, alternatively, if the minimization is too costly for calculation, reduce as much as possible in a realistic amount of time like the current approach.

int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
final PlacementResult<List<BookieId>> placementResult =
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
if (placementResult.isAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
return placementResult;
} else {
if (slave == null) {
return placementResult;
} else {
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
}
}
}

@Override
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
super.handleBookiesThatLeft(leftBookies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
Expand All @@ -69,6 +71,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1071,4 +1074,173 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
}
return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
}

@Override
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
rwLock.readLock().lock();
try {
final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
.map(this::convertBookieToNode).collect(Collectors.toList());
final Set<Node> excludeNodes = convertBookiesToNodes(
addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
final RRTopologyAwareCoverageEnsemble ensemble =
new RRTopologyAwareCoverageEnsemble(
ensembleSize,
writeQuorumSize,
ackQuorumSize,
RACKNAME_DISTANCE_FROM_LEAVES,
null,
null,
minNumRacksPerWriteQuorumForThisEnsemble);

int numRacks = topology.getNumOfRacks();
// only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
}

BookieNode prevNode = null;
final BookieNode firstNode = provisionalEnsembleNodes.get(0);
// use same bookie at first to reduce ledger replication
if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
&& ensemble.addNode(firstNode)) {
excludeNodes.add(firstNode);
prevNode = firstNode;
}

for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
final String curRack;
if (null == prevNode) {
if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
curRack = NodeBase.ROOT;
} else {
curRack = localNode.getNetworkLocation();
}
} else {
curRack = "~" + prevNode.getNetworkLocation();
}

try {
prevNode = replaceToAdherePlacementPolicyInternal(
curRack, excludeNodes, ensemble, ensemble,
provisionalEnsembleNodes, i, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
// got a good candidate
if (ensemble.addNode(prevNode)) {
// add the candidate to exclude set
excludeNodes.add(prevNode);
} else {
throw new BKNotEnoughBookiesException();
}
// replace to newer node
provisionalEnsembleNodes.set(i, prevNode);
} catch (BKNotEnoughBookiesException e) {
LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
}
}
List<BookieId> bookieList = ensemble.toList();
if (ensembleSize != bookieList.size()) {
LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
ensembleSize, bookieList);
return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
}
return PlacementResult.of(bookieList,
isEnsembleAdheringToPlacementPolicy(
bookieList, writeQuorumSize, ackQuorumSize));
} finally {
rwLock.readLock().unlock();
}
}

private BookieNode replaceToAdherePlacementPolicyInternal(
String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
// if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
return currentNode;
}

final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
final Set<String> preExcludeRacks = new HashSet<>();
final Set<String> postExcludeRacks = new HashSet<>();
for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
.getNetworkLocation());
postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
.getNetworkLocation());
}
// adhere minNumRacksPerWriteQuorum by preExcludeRacks
// avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
// avoid to use first candidate bookies for election by provisionalEnsembleNodes
conditionList.add(Pair.of(
"~" + String.join(",",
Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
provisionalEnsembleNodes
));
// avoid to use same rack between previous index by netPath
// avoid to use first candidate bookies for election by provisionalEnsembleNodes
conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
// avoid to use same rack between previous index by netPath
conditionList.add(Pair.of(netPath, Collections.emptyList()));

for (Pair<String, List<BookieNode>> condition : conditionList) {
WeightedRandomSelection<BookieNode> wRSelection = null;

final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
if (!isWeighted) {
Collections.shuffle(leaves);
} else {
if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
throw new BKNotEnoughBookiesException();
}
wRSelection = prepareForWeightedSelection(leaves);
if (wRSelection == null) {
throw new BKNotEnoughBookiesException();
}
}

final Iterator<Node> it = leaves.iterator();
final Set<Node> bookiesSeenSoFar = new HashSet<>();
while (true) {
Node n;
if (isWeighted) {
if (bookiesSeenSoFar.size() == leaves.size()) {
// Don't loop infinitely.
break;
}
n = wRSelection.getNextRandom();
bookiesSeenSoFar.add(n);
} else {
if (it.hasNext()) {
n = it.next();
} else {
break;
}
}
if (excludeBookies.contains(n)) {
continue;
}
if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
continue;
}
// additional excludeBookies
if (condition.getRight().contains(n)) {
continue;
}
BookieNode bn = (BookieNode) n;
return bn;
}
}

throw new BKNotEnoughBookiesException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -802,12 +802,16 @@ protected String resolveNetworkLocation(BookieId addr) {
protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) {
Set<Node> nodes = new HashSet<Node>();
for (BookieId addr : excludeBookies) {
BookieNode bn = knownBookies.get(addr);
if (null == bn) {
bn = createBookieNode(addr);
}
nodes.add(bn);
nodes.add(convertBookieToNode(addr));
}
return nodes;
}

protected BookieNode convertBookieToNode(BookieId addr) {
BookieNode bn = knownBookies.get(addr);
if (null == bn) {
bn = createBookieNode(addr);
}
return bn;
}
}
Loading