Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb7384b
Support repaired not adhering placement policy ledger.
horizonzy Jun 24, 2022
0108c98
add unit test.
horizonzy Jun 25, 2022
f7a3b49
code clean.
horizonzy Jun 25, 2022
dd8d779
tuning doc.
horizonzy Jun 25, 2022
4a66b58
code tuning
horizonzy Jun 26, 2022
5433819
Complete the test case.
horizonzy Jun 27, 2022
12582a7
fix checkstyle.
horizonzy Jun 27, 2022
1e49b22
code clean.
horizonzy Jul 6, 2022
d274c79
do nothing when didn't find same rack bookies num > 1
horizonzy Jul 10, 2022
552f2d1
use replaceBookie.
horizonzy Jul 10, 2022
f6eeb35
fix check style.
horizonzy Jul 10, 2022
828fcd0
code clean.
horizonzy Jul 11, 2022
cba288f
enhance test cases.
horizonzy Jul 11, 2022
88808ef
enhance unit tests.
horizonzy Jul 11, 2022
44dcf84
enhance unit test.
horizonzy Jul 12, 2022
a2f33d0
code clean.
horizonzy Jul 12, 2022
0ff5d96
code clean.
horizonzy Jul 12, 2022
1170150
Merge branch 'master' into feature-auto-recover-match-placement
horizonzy Jul 13, 2022
733ba19
tuning allocator pooling concurrency.
horizonzy Jul 29, 2022
3f03848
make replaceNotAdheringPlacementPolicyBookie default implements.
horizonzy Jul 29, 2022
c9b3b5a
fix checkstyle.
horizonzy Jul 31, 2022
af1d763
remove duplicated dependency.
horizonzy Jul 31, 2022
e4dc79a
add config in read me file.
horizonzy Aug 1, 2022
15ea486
tuning readme file.
horizonzy Aug 4, 2022
7669c7a
address comment.
horizonzy Aug 4, 2022
5028740
feat: add relocate placement command
equanz Nov 15, 2021
c54b168
feat: add relocate placement command
equanz Mar 17, 2022
afc9a2c
enhance replaceToAdherePlacementPolicy.
horizonzy Aug 5, 2022
0b91805
enhance replaceToAdherePlacementPolicy.
horizonzy Aug 5, 2022
aef4e3e
use replaceToAdherePlacementPolicy to find target bookie.
horizonzy Aug 6, 2022
98147d7
fix checkstyle.
horizonzy Aug 6, 2022
a2de135
fix checkstyle.
horizonzy Aug 7, 2022
465f986
address comment.
horizonzy Aug 7, 2022
965bf96
address comment.
horizonzy Aug 8, 2022
edefdeb
fix check style.
horizonzy Aug 8, 2022
6ef1e2a
address comment.
horizonzy Aug 8, 2022
85bd141
reset StaticDNSResolver after test.
horizonzy Aug 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieToFreeSpaceMap) {
return;
}

@Override
public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata) {
//local bookie needn't support it.
return Collections.emptyMap();
}

@Override
public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList,
int writeQuorumSize, int ackQuorumSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
bookieQuarantineRatio = 1.0;
}

private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it just for test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1128,14 +1129,24 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
* @param ledgerFragment
* - LedgerFragment to replicate
*/
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
throws InterruptedException, BKException {
Optional<Set<BookieId>> excludedBookies = Optional.empty();
Map<Integer, BookieId> targetBookieAddresses =
getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
Map<Integer, BookieId> targetBookieAddresses = null;
if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between DATA_LOSS and DATA_NOT_ADHERING_PLACEMENT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the DATA_LOSS, we know the bad bookie we want to replace in the ledgerChecker.
On the DATA_NOT_ADHERING_PLACEMENT, we just know the ensemble is not adhering the placement policy, so we need find which bookie should be replaced.

Optional<Set<BookieId>> excludedBookies = Optional.empty();
targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
} else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize(),
lh.getLedgerMetadata().getCustomMetadata());
ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet());
}
if (MapUtils.isEmpty(targetBookieAddresses)) {
LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.",
ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId());
return;
}
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
}

Expand Down Expand Up @@ -1776,6 +1787,13 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI
ackQuorumSize);
}

public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList,
int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata) {
return bkc.getPlacementPolicy()
.replaceNotAdheringPlacementPolicyBookie(ensembleBookiesList, writeQuorumSize, ackQuorumSize,
customMetadata);
}

/**
* Makes async request for getting list of entries of ledger from a bookie
* and returns Future for the result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
}
}

@Override
public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata) {
//Default ensemble placement policy always adhering.
return Collections.emptyMap();
}

@Override
public void uninitalize() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
}

/**
* Replace some bookie to adhering placement policy. If the all kinds of replacement
* didn't adhere placement policy, return empty map.
*
* @param ensemble
* @param writeQuorumSize
* @param ackQuorumSize
* @return Map: key means ensemble index, value means target replace bookieId.
*/
Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it a default implementation so it won't break the user-side implementation? Like how line 364 do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cherry-pick this to a minor release, this will break the existing implementation which is out of this repo. So I would suggest using the default to avoid this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we must add a default implementation

int ackQuorumSize, Map<String, byte[]> customMetadata);

/**
* Select one bookie to the "sticky" bookie where all reads for a particular
* ledger will be directed to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public class LedgerFragment {
private final long ledgerId;
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;
private ReplicateType replicateType = ReplicateType.DATA_LOSS;

LedgerFragment(LedgerHandle lh,
public LedgerFragment(LedgerHandle lh,
long firstEntryId,
long lastKnownEntryId,
Set<Integer> bookieIndexes) {
Expand All @@ -56,7 +57,7 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}

LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
this.ledgerId = lf.ledgerId;
this.firstEntryId = lf.firstEntryId;
this.lastKnownEntryId = lf.lastKnownEntryId;
Expand Down Expand Up @@ -91,7 +92,7 @@ public boolean isClosed() {
return isLedgerClosed;
}

long getLedgerId() {
public long getLedgerId() {
return ledgerId;
}

Expand Down Expand Up @@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
return this.ensemble;
}

public ReplicateType getReplicateType() {
return replicateType;
}

public void setReplicateType(ReplicateType replicateType) {
this.replicateType = replicateType;
}

@Override
public String toString() {
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
getAddresses(), isLedgerClosed);
}

/**
* ReplicateType.
*/
public enum ReplicateType {
DATA_LOSS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, do we set DATA_LOSS ? I don't see if it's being set anywhere? is it possible to add test-case to cover DATA_LOSS case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see line_42, the default type is DATA_LOSS

DATA_NOT_ADHERING_PLACEMENT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
Expand All @@ -54,6 +56,7 @@
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -790,6 +793,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
}
}

@Override
public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata) {
rwLock.readLock().lock();
try {
if (CollectionUtils.isEmpty(ensemble)) {
return Collections.emptyMap();
}
PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
writeQuorumSize, ackQuorumSize);
if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
return Collections.emptyMap();
}
Map<BookieId, Integer> bookieIndex = new HashMap<>();
for (int i = 0; i < ensemble.size(); i++) {
bookieIndex.put(ensemble.get(i), i);
}
Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);

Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
for (BookieId bookieId : ensemble) {
//When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not
// data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick
// the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should
// replace inactive bookie firstly.
BookieNode bookieNode = clone.get(bookieId);
if (bookieNode == null) {
List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
k -> new ArrayList<>());
list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE));
} else {
List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
k -> new ArrayList<>());
list.add(bookieNode);
}
}
for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
Collections.shuffle(bookieNodes);
}

Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream()
.collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet());

Set<BookieId> excludesBookies = new HashSet<>();

for (String key : toPlaceGroup.keySet()) {
List<BookieNode> sameRack = knownRackToBookies.get(key);
if (!CollectionUtils.isEmpty(sameRack)) {
excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
}
}

Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
boolean placeSucceed = false;
while (knownRacks.size() > 0) {
BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
if (beReplaceNode == null) {
break;
}
Integer index = bookieIndex.get(beReplaceNode.getAddr());
try {
PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize,
ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies);
BookieNode replaceNode = clone.get(placementResult.getResult());
String replaceNodeNetwork = replaceNode.getNetworkLocation();
knownRacks.remove(replaceNodeNetwork);
List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
k -> new ArrayList<>());
nodes.add(replaceNode);
targetBookieAddresses.put(index, replaceNode.getAddr());
List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork);
if (!CollectionUtils.isEmpty(bookieNodes)) {
excludesBookies.addAll(
bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
}
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.warn("Didn't find replaced bookie to adhere placement policy.", e);
break;
}

List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map(
BookieNode::getAddr).collect(Collectors.toList());
ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles,
writeQuorumSize, ackQuorumSize);
if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
placeSucceed = true;
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, we can't recover the ensemble fragment, which has multiple bookies for a specific rack.

For example, consider E=5, Qw=2, Qa=2 ensemble fragment like ["128.0.0.0:3181", "128.0.0.3:3181", "128.0.0.1:3181", "128.0.0.6:3181", "128.0.0.4:3181"], use RackawareEnsemblePlacementPolicy, and set enforceMinNumRacksPerWriteQuorum to false.
(definition of rack: https://github.com/apache/bookkeeper/pull/3359/files#diff-aac3491b47dd2a6b2936e726b36151e2d232409878845c599beb7df5a3c739afR1489-R1498 )

When /default-region/r3 goes down, RackawareEnsemblePlacementPolicy#replaceBookie returns new ensemble fragment by random selection like ["128.0.0.0:3181", "128.0.0.3:3181", "128.0.0.1:3181", "128.0.0.2:3181", "128.0.0.4:3181"].

After that and /default-region/r3 is recovered, then TopologyAwareEnsemblePlacementPolicy#replaceNotAdheringPlacementPolicyBookie can't return MEET_STRICT result.

Test cases are as below.

% git --no-pager show --no-patch HEAD
commit 11701505ffda2fe68c84a2621aaf595ef51d9979 (HEAD, horizonzy/feature-auto-recover-match-placement)
Merge: 0ff5d96f3 c3706e9c2
Author: horizonzy <horizonzy@apache.org>
Date:   Wed Jul 13 09:27:50 2022 +0800

    Merge branch 'master' into feature-auto-recover-match-placement

    # Conflicts:
    #       bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
    #       bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 0b80f7af0..5dee731e6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -22,6 +22,7 @@ import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
 import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
 import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;

+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.util.HashedWheelTimer;
 import java.net.InetAddress;
@@ -40,6 +41,7 @@ import junit.framework.TestCase;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementResult;
 import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
 import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
 import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
@@ -1421,11 +1423,11 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
     public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
         repp.uninitalize();

-        int minNumRacksPerWriteQuorum = 3;
+        int minNumRacksPerWriteQuorum = 2;
         ClientConfiguration clientConf = new ClientConfiguration(conf);
         clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
         // set enforceMinNumRacksPerWriteQuorum
-        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(false);
         repp = new RackawareEnsemblePlacementPolicy();
         repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
                 NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
@@ -1436,6 +1438,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         String[] rackLocationNames = new String[numOfRacks];
         List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
         Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        Map<String, Set<BookieId>> rackMap = new HashMap<>();
         BookieId bookieAddress;

         for (int i = 0; i < numOfRacks; i++) {
@@ -1446,29 +1449,45 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
                 bookieSocketAddresses.add(bookieAddress);
                 bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+                rackMap.computeIfAbsent(rackLocationNames[i], k -> Sets.newHashSet()).add(bookieAddress);
             }
         }

         repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());

-        int writeQuorum = 3;
-        int ackQuorum = 3;
+        int writeQuorum = 2;
+        int ackQuorum = 2;

         //test three knows bookie
         List<BookieId> knowsEnsemble = new ArrayList<>();
         knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.3:3181"));
         knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
-        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+        //knowsEnsemble.add(BookieId.parse("128.0.0.2:3181")); // should be replaced to /r3 like 128.0.0.6
+        knowsEnsemble.add(BookieId.parse("128.0.0.6:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.4:3181"));

         PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
                 knowsEnsemble, writeQuorum, ackQuorum);
-        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+        //assertEquals(PlacementPolicyAdherence.FAIL, placementPolicyAdherence);
+
+        // /default-region/r3 goes down
+        repp.handleBookiesThatLeft(rackMap.get(rackLocationNames[2]));
+
+        final PlacementResult<BookieId> result = repp.replaceBookie(knowsEnsemble.size(), writeQuorum, ackQuorum,
+                Collections.emptyMap(), knowsEnsemble, knowsEnsemble.get(3), Sets.newHashSet());
+
+        assertNotSame(PlacementPolicyAdherence.MEETS_STRICT, result.isAdheringToPolicy());
+        assertNotSame(rackLocationNames[2], bookieRackMap.get(result.getResult()));
+        knowsEnsemble.set(3, result.getResult());
+        LOG.error("{}", knowsEnsemble);
+
+        // /default-region/r3 is recovered
+        repp.handleBookiesThatJoined(rackMap.get(rackLocationNames[2]));

         Map<Integer, BookieId> targetBookie =
-                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, writeQuorum, ackQuorum,
                         Collections.emptyMap());
-        //should replace two bookie
-        assertEquals(targetBookie.size(), 2);

         for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
             knowsEnsemble.set(entry.getKey(), entry.getValue());
@@ -1478,6 +1497,10 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 knowsEnsemble, writeQuorum, ackQuorum);
         assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);

+        //should replace one bookie
+        LOG.error(String.valueOf(targetBookie));
+        assertEquals(1, targetBookie.size());
+
         //test three unknowns bookie
         List<BookieId> unknownEnsembles = new ArrayList<>();
         unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2931 PR, I've tried to fix a similar issue considering the above case.

If we don't care about the above case in this PR, but it is still an issue, I'll try to fix it in #2931 by following your interfaces.

Copy link
Member Author

@horizonzy horizonzy Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, cause the same rack bookie is close, we should consider the order of ensemble.

return placeSucceed ? targetBookieAddresses : Collections.emptyMap();
} finally {
rwLock.readLock().unlock();
}
}

private BookieNode getBeReplaceNode(Map<String, List<BookieNode>> toPlaceGroup) {
List<BookieNode> inactiveNodes = toPlaceGroup.get(NetworkTopology.INACTIVE);
if (!CollectionUtils.isEmpty(inactiveNodes)) {
return inactiveNodes.remove(inactiveNodes.size() - 1);
}
Optional<Map.Entry<String, List<BookieNode>>> toPlaceEntry = toPlaceGroup.entrySet().stream()
.filter(ele -> ele.getValue().size() > 1).findAny();
if (!toPlaceEntry.isPresent()) {
return null;
}
Map.Entry<String, List<BookieNode>> entry = toPlaceEntry.get();
return entry.getValue().remove(entry.getValue().size() - 1);
}

protected BookieNode createBookieNode(BookieId addr) {
return new BookieNode(addr, resolveNetworkLocation(addr));
}
Expand All @@ -814,9 +927,9 @@ protected String resolveNetworkLocation(BookieId addr) {
}
}

protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) {
protected Set<Node> convertBookiesToNodes(Collection<BookieId> bookies) {
Set<Node> nodes = new HashSet<Node>();
for (BookieId addr : excludeBookies) {
for (BookieId addr : bookies) {
BookieNode bn = knownBookies.get(addr);
if (null == bn) {
bn = createBookieNode(addr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
protected static final String AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL =
"auditorPeriodicPlacementPolicyCheckInterval";
protected static final String REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED =
"repairedPlacementPolicyNotAdheringBookieEnabled";
protected static final String AUDITOR_LEDGER_VERIFICATION_PERCENTAGE = "auditorLedgerVerificationPercentage";
protected static final String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
protected static final String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
Expand Down Expand Up @@ -2575,6 +2577,14 @@ public long getAuditorPeriodicPlacementPolicyCheckInterval() {
return getLong(AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL, 0);
}

public void setRepairedPlacementPolicyNotAdheringBookieEnable(boolean enabled) {
setProperty(REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED, enabled);
}

public boolean getRepairedPlacementPolicyNotAdheringBookieEnable() {
return getBoolean(REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED, false);
}

/**
* Sets the grace period (in seconds) for underreplicated ledgers recovery.
* If ledger is marked underreplicated for more than this period then it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface NetworkTopology {
String DEFAULT_RACK = "/default-rack";
String DEFAULT_ZONE = "/default-zone";
String DEFAULT_UPGRADEDOMAIN = "/default-upgradedomain";
String INACTIVE = "/inactive";
String DEFAULT_ZONE_AND_UPGRADEDOMAIN = DEFAULT_ZONE + DEFAULT_UPGRADEDOMAIN;
String DEFAULT_REGION_AND_RACK = DEFAULT_REGION + DEFAULT_RACK;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,22 @@ public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
//If user enable repaired, mark this ledger to under replication manager.
if (conf.getRepairedPlacementPolicyNotAdheringBookieEnable()) {
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
Collections.emptyList()).whenComplete((res, e) -> {
if (e != null) {
LOG.error("For ledger: {}, the placement policy not adhering bookie "
+ "storage, mark it to under replication manager failed.",
ledgerId, e);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+ " storage, mark it to under replication manager", ledgerId);
}
});
}
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.incrementAndGet();
}
Expand Down
Loading