-
Notifications
You must be signed in to change notification settings - Fork 963
Feature: auto recover support repaired not adhering placement ledger #3359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
fb7384b
0108c98
f7a3b49
dd8d779
4a66b58
5433819
12582a7
1e49b22
d274c79
552f2d1
f6eeb35
828fcd0
cba288f
88808ef
44dcf84
a2f33d0
0ff5d96
1170150
733ba19
3f03848
c9b3b5a
af1d763
e4dc79a
15ea486
7669c7a
5028740
c54b168
afc9a2c
0b91805
aef4e3e
98147d7
a2de135
465f986
965bf96
edefdeb
6ef1e2a
85bd141
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,7 @@ | |
| import org.apache.bookkeeper.stats.StatsLogger; | ||
| import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger; | ||
| import org.apache.bookkeeper.util.IOUtils; | ||
| import org.apache.commons.collections4.MapUtils; | ||
| import org.apache.zookeeper.AsyncCallback; | ||
| import org.apache.zookeeper.KeeperException; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -1129,14 +1130,24 @@ private ArrayList<BookieId> replaceBookiesInEnsemble( | |
| * @param ledgerFragment | ||
| * - LedgerFragment to replicate | ||
| */ | ||
| public void replicateLedgerFragment(LedgerHandle lh, | ||
| final LedgerFragment ledgerFragment, | ||
| final BiConsumer<Long, Long> onReadEntryFailureCallback) | ||
| throws InterruptedException, BKException { | ||
| Optional<Set<BookieId>> excludedBookies = Optional.empty(); | ||
| Map<Integer, BookieId> targetBookieAddresses = | ||
| getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), | ||
| ledgerFragment.getBookiesIndexes(), excludedBookies); | ||
| public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment, | ||
| final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException { | ||
| Map<Integer, BookieId> targetBookieAddresses = null; | ||
| if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the difference between
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the |
||
| Optional<Set<BookieId>> excludedBookies = Optional.empty(); | ||
| targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(), | ||
| ledgerFragment.getBookiesIndexes(), excludedBookies); | ||
| } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) { | ||
| targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(), | ||
| lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize(), | ||
| lh.getLedgerMetadata().getCustomMetadata()); | ||
| ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet()); | ||
| } | ||
| if (MapUtils.isEmpty(targetBookieAddresses)) { | ||
| LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.", | ||
| ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId()); | ||
| return; | ||
| } | ||
| replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback); | ||
| } | ||
|
|
||
|
|
@@ -1777,6 +1788,13 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI | |
| ackQuorumSize); | ||
| } | ||
|
|
||
| public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList, | ||
| int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata) { | ||
| return bkc.getPlacementPolicy() | ||
| .replaceNotAdheringPlacementPolicyBookie(ensembleBookiesList, writeQuorumSize, ackQuorumSize, | ||
| customMetadata); | ||
| } | ||
|
|
||
| /** | ||
| * Makes async request for getting list of entries of ledger from a bookie | ||
| * and returns Future for the result. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,8 +39,9 @@ public class LedgerFragment { | |
| private final long ledgerId; | ||
| private final DistributionSchedule schedule; | ||
| private final boolean isLedgerClosed; | ||
| private ReplicateType replicateType = ReplicateType.DATA_LOSS; | ||
|
|
||
| LedgerFragment(LedgerHandle lh, | ||
| public LedgerFragment(LedgerHandle lh, | ||
| long firstEntryId, | ||
| long lastKnownEntryId, | ||
| Set<Integer> bookieIndexes) { | ||
|
|
@@ -56,7 +57,7 @@ public class LedgerFragment { | |
| || !ensemble.equals(ensembles.get(ensembles.lastKey())); | ||
| } | ||
|
|
||
| LedgerFragment(LedgerFragment lf, Set<Integer> subset) { | ||
| public LedgerFragment(LedgerFragment lf, Set<Integer> subset) { | ||
| this.ledgerId = lf.ledgerId; | ||
| this.firstEntryId = lf.firstEntryId; | ||
| this.lastKnownEntryId = lf.lastKnownEntryId; | ||
|
|
@@ -91,7 +92,7 @@ public boolean isClosed() { | |
| return isLedgerClosed; | ||
| } | ||
|
|
||
| long getLedgerId() { | ||
| public long getLedgerId() { | ||
| return ledgerId; | ||
| } | ||
|
|
||
|
|
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() { | |
| return this.ensemble; | ||
| } | ||
|
|
||
| public ReplicateType getReplicateType() { | ||
| return replicateType; | ||
| } | ||
|
|
||
| public void setReplicateType(ReplicateType replicateType) { | ||
| this.replicateType = replicateType; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], " | ||
| + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId, | ||
| getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(), | ||
| getAddresses(), isLedgerClosed); | ||
| } | ||
|
|
||
| /** | ||
| * ReplicateType. | ||
| */ | ||
| public enum ReplicateType { | ||
| DATA_LOSS, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case, do we set
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see line_42, the default type is |
||
| DATA_NOT_ADHERING_PLACEMENT | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,10 +33,12 @@ | |
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; | ||
| import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject; | ||
| import org.apache.bookkeeper.net.BookieId; | ||
|
|
@@ -52,6 +54,7 @@ | |
| import org.apache.bookkeeper.stats.Counter; | ||
| import org.apache.bookkeeper.stats.OpStatsLogger; | ||
| import org.apache.bookkeeper.stats.annotations.StatsDoc; | ||
| import org.apache.commons.collections4.CollectionUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) { | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize, | ||
| int ackQuorumSize, Map<String, byte[]> customMetadata) { | ||
| rwLock.readLock().lock(); | ||
| try { | ||
| if (CollectionUtils.isEmpty(ensemble)) { | ||
| return Collections.emptyMap(); | ||
| } | ||
| PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble, | ||
| writeQuorumSize, ackQuorumSize); | ||
| if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) { | ||
| return Collections.emptyMap(); | ||
| } | ||
| Map<BookieId, Integer> bookieIndex = new HashMap<>(); | ||
| for (int i = 0; i < ensemble.size(); i++) { | ||
| bookieIndex.put(ensemble.get(i), i); | ||
| } | ||
| Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies); | ||
|
|
||
| Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>(); | ||
| for (BookieId bookieId : ensemble) { | ||
| //When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not | ||
| // data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick | ||
| // the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should | ||
| // replace inactive bookie firstly. | ||
| BookieNode bookieNode = clone.get(bookieId); | ||
| if (bookieNode == null) { | ||
| List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE, | ||
| k -> new ArrayList<>()); | ||
| list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE)); | ||
| } else { | ||
| List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(), | ||
| k -> new ArrayList<>()); | ||
| list.add(bookieNode); | ||
| } | ||
| } | ||
| for (List<BookieNode> bookieNodes : toPlaceGroup.values()) { | ||
| Collections.shuffle(bookieNodes); | ||
| } | ||
|
|
||
| Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream() | ||
| .collect(Collectors.groupingBy(NodeBase::getNetworkLocation)); | ||
| HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet()); | ||
|
|
||
| Set<BookieId> excludesBookies = new HashSet<>(); | ||
|
|
||
| for (String key : toPlaceGroup.keySet()) { | ||
| List<BookieNode> sameRack = knownRackToBookies.get(key); | ||
| if (!CollectionUtils.isEmpty(sameRack)) { | ||
| excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet())); | ||
| } | ||
| } | ||
|
|
||
| Map<Integer, BookieId> targetBookieAddresses = new HashMap<>(); | ||
| boolean placeSucceed = false; | ||
| while (knownRacks.size() > 0) { | ||
| BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup); | ||
| if (beReplaceNode == null) { | ||
| break; | ||
| } | ||
| Integer index = bookieIndex.get(beReplaceNode.getAddr()); | ||
| try { | ||
| PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize, | ||
| ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies); | ||
| BookieNode replaceNode = clone.get(placementResult.getResult()); | ||
| String replaceNodeNetwork = replaceNode.getNetworkLocation(); | ||
| knownRacks.remove(replaceNodeNetwork); | ||
| List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork, | ||
| k -> new ArrayList<>()); | ||
| nodes.add(replaceNode); | ||
| targetBookieAddresses.put(index, replaceNode.getAddr()); | ||
| List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork); | ||
| if (!CollectionUtils.isEmpty(bookieNodes)) { | ||
| excludesBookies.addAll( | ||
| bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet())); | ||
| } | ||
| } catch (BKException.BKNotEnoughBookiesException e) { | ||
| LOG.warn("Didn't find replaced bookie to adhere placement policy.", e); | ||
| break; | ||
| } | ||
|
|
||
| List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map( | ||
| BookieNode::getAddr).collect(Collectors.toList()); | ||
| ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles, | ||
| writeQuorumSize, ackQuorumSize); | ||
| if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) { | ||
| placeSucceed = true; | ||
| break; | ||
| } | ||
| } | ||
|
||
| return placeSucceed ? targetBookieAddresses : Collections.emptyMap(); | ||
| } finally { | ||
| rwLock.readLock().unlock(); | ||
| } | ||
| } | ||
|
|
||
| private BookieNode getBeReplaceNode(Map<String, List<BookieNode>> toPlaceGroup) { | ||
| List<BookieNode> inactiveNodes = toPlaceGroup.get(NetworkTopology.INACTIVE); | ||
| if (!CollectionUtils.isEmpty(inactiveNodes)) { | ||
| return inactiveNodes.remove(inactiveNodes.size() - 1); | ||
| } | ||
| Optional<Map.Entry<String, List<BookieNode>>> toPlaceEntry = toPlaceGroup.entrySet().stream() | ||
| .filter(ele -> ele.getValue().size() > 1).findAny(); | ||
| if (!toPlaceEntry.isPresent()) { | ||
| return null; | ||
| } | ||
| Map.Entry<String, List<BookieNode>> entry = toPlaceEntry.get(); | ||
| return entry.getValue().remove(entry.getValue().size() - 1); | ||
| } | ||
|
|
||
| protected BookieNode createBookieNode(BookieId addr) { | ||
| return new BookieNode(addr, resolveNetworkLocation(addr)); | ||
| } | ||
|
|
@@ -812,9 +925,9 @@ protected String resolveNetworkLocation(BookieId addr) { | |
| } | ||
| } | ||
|
|
||
| protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) { | ||
| protected Set<Node> convertBookiesToNodes(Collection<BookieId> bookies) { | ||
| Set<Node> nodes = new HashSet<Node>(); | ||
| for (BookieId addr : excludeBookies) { | ||
| for (BookieId addr : bookies) { | ||
| BookieNode bn = knownBookies.get(addr); | ||
| if (null == bn) { | ||
| bn = createBookieNode(addr); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change it just for test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.