-
Notifications
You must be signed in to change notification settings - Fork 963
Feature: auto recover support repaired not adhering placement ledger #3359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb7384b
0108c98
f7a3b49
dd8d779
4a66b58
5433819
12582a7
1e49b22
d274c79
552f2d1
f6eeb35
828fcd0
cba288f
88808ef
44dcf84
a2f33d0
0ff5d96
1170150
733ba19
3f03848
c9b3b5a
af1d763
e4dc79a
15ea486
7669c7a
5028740
c54b168
afc9a2c
0b91805
aef4e3e
98147d7
a2de135
465f986
965bf96
edefdeb
6ef1e2a
85bd141
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,8 +33,10 @@ | |
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Enumeration; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
|
|
@@ -90,6 +92,7 @@ | |
| import org.apache.bookkeeper.stats.StatsLogger; | ||
| import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger; | ||
| import org.apache.bookkeeper.util.IOUtils; | ||
| import org.apache.commons.collections4.MapUtils; | ||
| import org.apache.zookeeper.AsyncCallback; | ||
| import org.apache.zookeeper.KeeperException; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -1095,7 +1098,7 @@ private Map<Integer, BookieId> getReplacementBookiesByIndexes( | |
| oldBookie, | ||
| bookiesToExclude); | ||
| BookieId newBookie = replaceBookieResponse.getResult(); | ||
| PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy(); | ||
| PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy(); | ||
| if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) { | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug( | ||
|
|
@@ -1129,14 +1132,23 @@ private ArrayList<BookieId> replaceBookiesInEnsemble( | |
| * @param ledgerFragment | ||
| * - LedgerFragment to replicate | ||
| */ | ||
| public void replicateLedgerFragment(LedgerHandle lh, | ||
| final LedgerFragment ledgerFragment, | ||
| final BiConsumer<Long, Long> onReadEntryFailureCallback) | ||
| throws InterruptedException, BKException { | ||
| Optional<Set<BookieId>> excludedBookies = Optional.empty(); | ||
| Map<Integer, BookieId> targetBookieAddresses = | ||
| getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), | ||
| ledgerFragment.getBookiesIndexes(), excludedBookies); | ||
| public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment, | ||
| final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException { | ||
| Map<Integer, BookieId> targetBookieAddresses = null; | ||
| if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the difference between
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the |
||
| Optional<Set<BookieId>> excludedBookies = Optional.empty(); | ||
| targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), | ||
| ledgerFragment.getBookiesIndexes(), excludedBookies); | ||
| } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { | ||
| targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), | ||
| lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize()); | ||
| ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet()); | ||
| } | ||
| if (MapUtils.isEmpty(targetBookieAddresses)) { | ||
| LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.", | ||
| ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId()); | ||
| return; | ||
| } | ||
| replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback); | ||
| } | ||
|
|
||
|
|
@@ -1777,6 +1789,31 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI | |
| ackQuorumSize); | ||
| } | ||
|
|
||
| public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList, | ||
| int writeQuorumSize, int ackQuorumSize) { | ||
| try { | ||
| EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult = bkc.getPlacementPolicy() | ||
| .replaceToAdherePlacementPolicy(ensembleBookiesList.size(), writeQuorumSize, ackQuorumSize, | ||
| new HashSet<>(), ensembleBookiesList); | ||
| if (PlacementPolicyAdherence.FAIL != placementResult.getAdheringToPolicy()) { | ||
| Map<Integer, BookieId> targetMap = new HashMap<>(); | ||
| List<BookieId> newEnsembles = placementResult.getResult(); | ||
| for (int i = 0; i < ensembleBookiesList.size(); i++) { | ||
| BookieId originBookie = ensembleBookiesList.get(i); | ||
| BookieId newBookie = newEnsembles.get(i); | ||
| if (!originBookie.equals(newBookie)) { | ||
| targetMap.put(i, newBookie); | ||
| } | ||
| } | ||
| return targetMap; | ||
| } | ||
| } catch (UnsupportedOperationException e) { | ||
| LOG.warn("The placement policy: {} didn't support replaceToAdherePlacementPolicy, " | ||
| + "ignore replace not adhere bookie.", bkc.getPlacementPolicy().getClass().getName()); | ||
| } | ||
| return Collections.emptyMap(); | ||
| } | ||
|
|
||
| /** | ||
| * Makes async request for getting list of entries of ledger from a bookie | ||
| * and returns Future for the result. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook | |
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that | ||
| * adheres placement policy. It should be implemented so as to minify the number of bookies replaced. | ||
| * | ||
| * @param ensembleSize | ||
| * ensemble size | ||
| * @param writeQuorumSize | ||
| * writeQuorumSize of the ensemble | ||
| * @param ackQuorumSize | ||
| * ackQuorumSize of the ensemble | ||
| * @param excludeBookies | ||
| * bookies that should not be considered as targets | ||
| * @param currentEnsemble | ||
| * current ensemble | ||
| * @return a placement result | ||
| */ | ||
| default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy( | ||
horizonzy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| int ensembleSize, | ||
| int writeQuorumSize, | ||
| int ackQuorumSize, | ||
| Set<BookieId> excludeBookies, | ||
| List<BookieId> currentEnsemble) { | ||
| throw new UnsupportedOperationException(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if you don't override this method? Arw we handling this exception in the code that calls this method? My understanding is that we cannot provide a good default implementation. In the called code we could catch this exception, log something and abort gracefully the operation
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's fine. |
||
| } | ||
|
|
||
| /** | ||
| * enum for PlacementPolicyAdherence. Currently we are supporting tri-value | ||
| * enum for PlacementPolicyAdherence. If placement policy is met strictly | ||
|
|
@@ -481,8 +506,16 @@ public T getResult() { | |
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Use {@link #getAdheringToPolicy}. | ||
| */ | ||
| @Deprecated | ||
| public PlacementPolicyAdherence isAdheringToPolicy() { | ||
| return policyAdherence; | ||
| } | ||
|
|
||
| public PlacementPolicyAdherence getAdheringToPolicy() { | ||
horizonzy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return policyAdherence; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,8 +39,9 @@ public class LedgerFragment { | |
| private final long ledgerId; | ||
| private final DistributionSchedule schedule; | ||
| private final boolean isLedgerClosed; | ||
| private ReplicateType replicateType = ReplicateType.DATA_LOSS; | ||
|
|
||
| LedgerFragment(LedgerHandle lh, | ||
| public LedgerFragment(LedgerHandle lh, | ||
| long firstEntryId, | ||
| long lastKnownEntryId, | ||
| Set<Integer> bookieIndexes) { | ||
|
|
@@ -56,7 +57,7 @@ public class LedgerFragment { | |
| || !ensemble.equals(ensembles.get(ensembles.lastKey())); | ||
| } | ||
|
|
||
| LedgerFragment(LedgerFragment lf, Set<Integer> subset) { | ||
| public LedgerFragment(LedgerFragment lf, Set<Integer> subset) { | ||
| this.ledgerId = lf.ledgerId; | ||
| this.firstEntryId = lf.firstEntryId; | ||
| this.lastKnownEntryId = lf.lastKnownEntryId; | ||
|
|
@@ -91,7 +92,7 @@ public boolean isClosed() { | |
| return isLedgerClosed; | ||
| } | ||
|
|
||
| long getLedgerId() { | ||
| public long getLedgerId() { | ||
| return ledgerId; | ||
| } | ||
|
|
||
|
|
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() { | |
| return this.ensemble; | ||
| } | ||
|
|
||
| public ReplicateType getReplicateType() { | ||
| return replicateType; | ||
| } | ||
|
|
||
| public void setReplicateType(ReplicateType replicateType) { | ||
| this.replicateType = replicateType; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], " | ||
| + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId, | ||
| getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(), | ||
| getAddresses(), isLedgerClosed); | ||
| } | ||
|
|
||
| /** | ||
| * ReplicateType. | ||
| */ | ||
| public enum ReplicateType { | ||
| DATA_LOSS, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case, do we set
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see line_42, the default type is |
||
| DATA_NOT_ADHERING_PLACEMENT | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change it just for test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.