Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.UpdateBookieInLedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.CorrectEnsemblePlacementCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.DecommissionCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.EndpointInfoCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
Expand Down Expand Up @@ -160,6 +161,7 @@ public class BookieShell implements Tool {
static final String CMD_CHECK_DB_LEDGERS_INDEX = "check-db-ledgers-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file";
static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";
static final String CMD_CORRECT_ENSEMBLE_PLACEMENT = "correct-ensemble-placement";

// cookie commands
static final String CMD_CREATE_COOKIE = "cookie_create";
Expand Down Expand Up @@ -2257,6 +2259,58 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}

class CorrectEnsemblePlacementCmd extends MyCommand {
final Options opts = new Options();

public CorrectEnsemblePlacementCmd() {
super(CMD_CORRECT_ENSEMBLE_PLACEMENT);
Option ledgerOption = new Option("l", "ledgerids", true,
"Target ledger IDs to relocate."
+ " Multiple can be specified, comma separated.");
ledgerOption.setRequired(true);
ledgerOption.setValueSeparator(',');
ledgerOption.setArgs(Option.UNLIMITED_VALUES);

opts.addOption(ledgerOption);
opts.addOption("dr", "dryrun", false,
"Printing the relocation plan w/o doing actual relocation");
opts.addOption("f", "force", false,
"Force relocation without confirmation");
opts.addOption("sk", "skipOpenLedgers", false,
"Skip relocating open ledgers");
}

@Override
Options getOptions() {
return opts;
}

@Override
String getDescription() {
return "Relocate ledgers to adhere ensemble placement policy.";
}

@Override
String getUsage() {
return CMD_CORRECT_ENSEMBLE_PLACEMENT
+ " --ledgerids <ledgerId[,ledgerId,...]> [--dryrun] [--force] [--skipOpenLedgers]";
}

@Override
int runCmd(CommandLine cmdLine) throws Exception {
final CorrectEnsemblePlacementCommand cmd = new CorrectEnsemblePlacementCommand();
final CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags
flags = new CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags();
final List<Long> ledgerIds = Arrays.stream(cmdLine.getOptionValues("ledgerids")).map(Long::parseLong)
.collect(Collectors.toList());
flags.ledgerIds(ledgerIds);
flags.dryRun(cmdLine.hasOption("dryrun"));
flags.force(cmdLine.hasOption("force"));
flags.skipOpenLedgers(cmdLine.hasOption("skipOpenLedgers"));
return cmd.apply(bkConf, flags) ? 0 : 1;
}
}

final Map<String, Command> commands = new HashMap<>();

{
Expand Down Expand Up @@ -2302,6 +2356,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
commands.put(CMD_FORCEAUDITCHECKS, new ForceAuditorChecksCmd());
commands.put(CMD_CORRECT_ENSEMBLE_PLACEMENT, new CorrectEnsemblePlacementCmd());
// cookie related commands
commands.put(CMD_CREATE_COOKIE,
new CreateCookieCommand().asShellCommand(CMD_CREATE_COOKIE, bkConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,7 @@ OrderedScheduler getScheduler() {
return scheduler;
}

@VisibleForTesting
EnsemblePlacementPolicy getPlacementPolicy() {
public EnsemblePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
import com.google.common.base.Functions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -31,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -50,6 +52,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.SneakyThrows;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
Expand Down Expand Up @@ -1137,7 +1141,7 @@ public void replicateLedgerFragment(LedgerHandle lh,
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
}

private void replicateLedgerFragment(LedgerHandle lh,
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final Map<Integer, BookieId> targetBookieAddresses,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
Expand Down Expand Up @@ -1225,6 +1229,108 @@ public void processResult(int rc, String s, Object ctx) {
}
}

/**
*
* @param lh Ledger Handle
* @param dryRun if true, run it without any modification.
* @return failed ledger fragment indices
* @throws UnsupportedOperationException Default behavior of
* {@link EnsemblePlacementPolicy#replaceToAdherePlacementPolicy(int, int, int, java.util.Set, java.util.List)}.
*/
public List<Long> relocateLedgerToAdherePlacementPolicy(LedgerHandle lh, boolean dryRun)
throws UnsupportedOperationException {
final EnsemblePlacementPolicy placementPolicy = bkc.getPlacementPolicy();

final long ledgerId = lh.getId();
final LedgerMetadata ledgerMeta = lh.getLedgerMetadata();
final List<Long> failedFragmentIndexList = new ArrayList<>();
final Map<Long, Long> ledgerFragmentsRange = new HashMap<>();
Long curEntryId = null;
for (Map.Entry<Long, ? extends List<BookieId>> entry :
ledgerMeta.getAllEnsembles().entrySet()) {
if (curEntryId != null) {
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
}
curEntryId = entry.getKey();
}
if (curEntryId != null) {
ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
}

for (Map.Entry<Long, ? extends List<BookieId>> entry : ledgerMeta.getAllEnsembles().entrySet()) {
if (placementPolicy.isEnsembleAdheringToPlacementPolicy(entry.getValue(),
ledgerMeta.getWriteQuorumSize(), ledgerMeta.getAckQuorumSize())
== EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
final List<BookieId> currentEnsemble = entry.getValue();
// Currently, don't consider quarantinedBookies
final EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult =
placementPolicy.replaceToAdherePlacementPolicy(
ledgerMeta.getEnsembleSize(),
ledgerMeta.getWriteQuorumSize(),
ledgerMeta.getAckQuorumSize(),
Collections.emptySet(),
currentEnsemble);

if (placementResult.isAdheringToPolicy()
== EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
LOG.warn("Failed to relocate the ensemble. So, skip the operation."
+ " ledgerId: {}, fragmentIndex: {}",
ledgerId, entry.getKey());
failedFragmentIndexList.add(entry.getKey());
} else {
final List<BookieId> newEnsemble = placementResult.getResult();
final Map<Integer, BookieId> replaceBookiesMap = IntStream
.range(0, ledgerMeta.getEnsembleSize()).boxed()
.filter(i -> !newEnsemble.get(i).equals(currentEnsemble.get(i)))
.collect(Collectors.toMap(Functions.identity(), newEnsemble::get));
if (replaceBookiesMap.isEmpty()) {
LOG.warn("Failed to get bookies to replace. So, skip the operation."
+ " ledgerId: {}, fragmentIndex: {}",
ledgerId, entry.getKey());
failedFragmentIndexList.add(entry.getKey());
} else if (dryRun) {
LOG.info("Would replace the ensemble. ledgerId: {}, fragmentIndex: {},"
+ " currentEnsemble: {} replaceBookiesMap {}",
ledgerId, entry.getKey(),
currentEnsemble, replaceBookiesMap);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to replace the ensemble. ledgerId: {}, fragmentIndex: {},"
+ " replaceBookiesMap {}",
ledgerId, entry.getKey(), replaceBookiesMap);
}
final LedgerFragment fragment = new LedgerFragment(lh, entry.getKey(),
ledgerFragmentsRange.get(entry.getKey()), replaceBookiesMap.keySet());

try {
replicateLedgerFragment(lh, fragment, replaceBookiesMap,
(lId, eId) -> {
// This consumer is already accepted before the method returns
// void. Therefore, use failedFragmentIndexList in this consumer.
LOG.warn("Failed to read entry {}:{}", lId, eId);
failedFragmentIndexList.add(entry.getKey());
});
if (LOG.isDebugEnabled()) {
LOG.debug("Operation finished in the ensemble. ledgerId: {},"
+ " fragmentIndex: {}, replaceBookiesMap {}",
ledgerId, entry.getKey(), replaceBookiesMap);
}
} catch (BKException | InterruptedException e) {
LOG.warn("Failed to replicate ledger fragment.", e);
failedFragmentIndexList.add(entry.getKey());
}
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The fragment is adhering to placement policy. So, skip the operation."
+ " ledgerId: {}, fragmentIndex: {}", ledgerId, entry.getKey());
}
}
}
return failedFragmentIndexList;
}

/**
* Format the BookKeeper metadata in zookeeper.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
return true;
}

/**
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
*
* @param ensembleSize
* ensemble size
* @param writeQuorumSize
* writeQuorumSize of the ensemble
* @param ackQuorumSize
* ackQuorumSize of the ensemble
* @param excludeBookies
* bookies that should not be considered as targets
* @param currentEnsemble
* current ensemble
* @return a placement result
*/
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
throw new UnsupportedOperationException();
}

/**
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
* enum for PlacementPolicyAdherence. If placement policy is met strictly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LedgerFragment {
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;

LedgerFragment(LedgerHandle lh,
public LedgerFragment(LedgerHandle lh,
long firstEntryId,
long lastKnownEntryId,
Set<Integer> bookieIndexes) {
Expand All @@ -56,7 +56,7 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}

LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
this.ledgerId = lf.ledgerId;
this.firstEntryId = lf.firstEntryId;
this.lastKnownEntryId = lf.lastKnownEntryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,28 @@ public BookieNode selectFromNetworkLocation(
}
}

@Override
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some case, the replace policy will replace more bookie. There are two racks.(rack1, rack2).
rack1: bookie1,bookie2,bookie3,bookie4
rack2: bookie5,bookie6,bookie7,bookie8
E:6 WQ:2 AQ:2. The ensemble is (5,6,1,7,2,8). (rack2,rack2,rack1,rack2,rack1,rack2), only replace the first bookie5 to bookie3, it adhere the placement policy. But now the implements didn't replace the firstly, it will replace from second element(bookie6), it will replace more bookie.

Copy link
Contributor Author

@equanz equanz Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. We can't "minimize" replacement bookies in some cases by the current approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can list the kinds of replace result, and pick the "minimize" result. We can pick the different start index to replace. such as start with index0, start with index1, .... Finally, we pick the "minimize" replace result.

Copy link
Contributor Author

@equanz equanz Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can list the kinds of replace result, and pick the "minimize" result.

Yeah.
I think, alternatively, if the minimization is too costly for calculation, reduce as much as possible in a realistic amount of time like the current approach.

int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
final PlacementResult<List<BookieId>> placementResult =
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
if (placementResult.isAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
return placementResult;
} else {
if (slave == null) {
return placementResult;
} else {
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
}
}
}

@Override
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
super.handleBookiesThatLeft(leftBookies);
Expand Down
Loading