Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb7384b
Support repaired not adhering placement policy ledger.
horizonzy Jun 24, 2022
0108c98
add unit test.
horizonzy Jun 25, 2022
f7a3b49
code clean.
horizonzy Jun 25, 2022
dd8d779
tuning doc.
horizonzy Jun 25, 2022
4a66b58
code tuning
horizonzy Jun 26, 2022
5433819
Complete the test case.
horizonzy Jun 27, 2022
12582a7
fix checkstyle.
horizonzy Jun 27, 2022
1e49b22
code clean.
horizonzy Jul 6, 2022
d274c79
do nothing when didn't find same rack bookies num > 1
horizonzy Jul 10, 2022
552f2d1
use replaceBookie.
horizonzy Jul 10, 2022
f6eeb35
fix check style.
horizonzy Jul 10, 2022
828fcd0
code clean.
horizonzy Jul 11, 2022
cba288f
enhance test cases.
horizonzy Jul 11, 2022
88808ef
enhance unit tests.
horizonzy Jul 11, 2022
44dcf84
enhance unit test.
horizonzy Jul 12, 2022
a2f33d0
code clean.
horizonzy Jul 12, 2022
0ff5d96
code clean.
horizonzy Jul 12, 2022
1170150
Merge branch 'master' into feature-auto-recover-match-placement
horizonzy Jul 13, 2022
733ba19
tuning allocator pooling concurrency.
horizonzy Jul 29, 2022
3f03848
make replaceNotAdheringPlacementPolicyBookie default implements.
horizonzy Jul 29, 2022
c9b3b5a
fix checkstyle.
horizonzy Jul 31, 2022
af1d763
remove duplicated dependency.
horizonzy Jul 31, 2022
e4dc79a
add config in read me file.
horizonzy Aug 1, 2022
15ea486
tuning readme file.
horizonzy Aug 4, 2022
7669c7a
address comment.
horizonzy Aug 4, 2022
5028740
feat: add relocate placement command
equanz Nov 15, 2021
c54b168
feat: add relocate placement command
equanz Mar 17, 2022
afc9a2c
enhance replaceToAdherePlacementPolicy.
horizonzy Aug 5, 2022
0b91805
enhance replaceToAdherePlacementPolicy.
horizonzy Aug 5, 2022
aef4e3e
use replaceToAdherePlacementPolicy to find target bookie.
horizonzy Aug 6, 2022
98147d7
fix checkstyle.
horizonzy Aug 6, 2022
a2de135
fix checkstyle.
horizonzy Aug 7, 2022
465f986
address comment.
horizonzy Aug 7, 2022
965bf96
address comment.
horizonzy Aug 8, 2022
edefdeb
fix check style.
horizonzy Aug 8, 2022
6ef1e2a
address comment.
horizonzy Aug 8, 2022
85bd141
reset StaticDNSResolver after test.
horizonzy Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
bookieQuarantineRatio = 1.0;
}

private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it just for test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -90,6 +92,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1095,7 +1098,7 @@ private Map<Integer, BookieId> getReplacementBookiesByIndexes(
oldBookie,
bookiesToExclude);
BookieId newBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
if (LOG.isDebugEnabled()) {
LOG.debug(
Expand Down Expand Up @@ -1129,14 +1132,23 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
* @param ledgerFragment
* - LedgerFragment to replicate
*/
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
throws InterruptedException, BKException {
Optional<Set<BookieId>> excludedBookies = Optional.empty();
Map<Integer, BookieId> targetBookieAddresses =
getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
Map<Integer, BookieId> targetBookieAddresses = null;
if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between DATA_LOSS and DATA_NOT_ADHERING_PLACEMENT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the DATA_LOSS, we know the bad bookie we want to replace in the ledgerChecker.
On the DATA_NOT_ADHERING_PLACEMENT, we just know the ensemble is not adhering the placement policy, so we need find which bookie should be replaced.

Optional<Set<BookieId>> excludedBookies = Optional.empty();
targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
} else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize());
ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet());
}
if (MapUtils.isEmpty(targetBookieAddresses)) {
LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.",
ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId());
return;
}
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
}

Expand Down Expand Up @@ -1777,6 +1789,31 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI
ackQuorumSize);
}

public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList,
int writeQuorumSize, int ackQuorumSize) {
try {
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult = bkc.getPlacementPolicy()
.replaceToAdherePlacementPolicy(ensembleBookiesList.size(), writeQuorumSize, ackQuorumSize,
new HashSet<>(), ensembleBookiesList);
if (PlacementPolicyAdherence.FAIL != placementResult.getAdheringToPolicy()) {
Map<Integer, BookieId> targetMap = new HashMap<>();
List<BookieId> newEnsembles = placementResult.getResult();
for (int i = 0; i < ensembleBookiesList.size(); i++) {
BookieId originBookie = ensembleBookiesList.get(i);
BookieId newBookie = newEnsembles.get(i);
if (!originBookie.equals(newBookie)) {
targetMap.put(i, newBookie);
}
}
return targetMap;
}
} catch (UnsupportedOperationException e) {
LOG.warn("The placement policy: {} didn't support replaceToAdherePlacementPolicy, "
+ "ignore replace not adhere bookie.", bkc.getPlacementPolicy().getClass().getName());
}
return Collections.emptyMap();
}

/**
* Makes async request for getting list of entries of ledger from a bookie
* and returns Future for the result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<BookieId>(quarantinedBookiesSet));
socketAddresses = newEnsembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
ensembleNotAdheringToPlacementPolicy.inc();
if (ensembleSize > 1) {
Expand All @@ -286,7 +286,7 @@ public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize,
newEnsembleResponse = placementPolicy.newEnsemble(
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
socketAddresses = newEnsembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
Expand Down Expand Up @@ -317,7 +317,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
existingBookies, addr, excludedBookiesAndQuarantinedBookies);
socketAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
Expand All @@ -333,7 +333,7 @@ public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuor
replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, existingBookies, addr, excludeBookies);
socketAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
ensembleNotAdheringToPlacementPolicy.inc();
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
return true;
}

/**
* Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
* adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
*
* @param ensembleSize
* ensemble size
* @param writeQuorumSize
* writeQuorumSize of the ensemble
* @param ackQuorumSize
* ackQuorumSize of the ensemble
* @param excludeBookies
* bookies that should not be considered as targets
* @param currentEnsemble
* current ensemble
* @return a placement result
*/
default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you don't override this method?

Arw we handling this exception in the code that calls this method?

My understanding is that we cannot provide a good default implementation.

In the called code we could catch this exception, log something and abort gracefully the operation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's fine.

}

/**
* enum for PlacementPolicyAdherence. Currently we are supporting tri-value
* enum for PlacementPolicyAdherence. If placement policy is met strictly
Expand Down Expand Up @@ -481,7 +506,7 @@ public T getResult() {
return result;
}

public PlacementPolicyAdherence isAdheringToPolicy() {
public PlacementPolicyAdherence getAdheringToPolicy() {
return policyAdherence;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public class LedgerFragment {
private final long ledgerId;
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;
private ReplicateType replicateType = ReplicateType.DATA_LOSS;

LedgerFragment(LedgerHandle lh,
public LedgerFragment(LedgerHandle lh,
long firstEntryId,
long lastKnownEntryId,
Set<Integer> bookieIndexes) {
Expand All @@ -56,7 +57,7 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}

LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
this.ledgerId = lf.ledgerId;
this.firstEntryId = lf.firstEntryId;
this.lastKnownEntryId = lf.lastKnownEntryId;
Expand Down Expand Up @@ -91,7 +92,7 @@ public boolean isClosed() {
return isLedgerClosed;
}

long getLedgerId() {
public long getLedgerId() {
return ledgerId;
}

Expand Down Expand Up @@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
return this.ensemble;
}

public ReplicateType getReplicateType() {
return replicateType;
}

public void setReplicateType(ReplicateType replicateType) {
this.replicateType = replicateType;
}

@Override
public String toString() {
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
getAddresses(), isLedgerClosed);
}

/**
* ReplicateType.
*/
public enum ReplicateType {
DATA_LOSS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, do we set DATA_LOSS ? I don't see if it's being set anywhere? is it possible to add test-case to cover DATA_LOSS case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see line_42, the default type is DATA_LOSS

DATA_NOT_ADHERING_PLACEMENT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,28 @@ public BookieNode selectFromNetworkLocation(
}
}

@Override
public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
Set<BookieId> excludeBookies,
List<BookieId> currentEnsemble) {
final PlacementResult<List<BookieId>> placementResult =
super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
if (placementResult.getAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
return placementResult;
} else {
if (slave == null) {
return placementResult;
} else {
return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
excludeBookies, currentEnsemble);
}
}
}

@Override
public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
super.handleBookiesThatLeft(leftBookies);
Expand Down
Loading