Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,66 +257,69 @@ void doPlacementPolicyCheck(Long ledgerId,
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (metadata.isClosed()) {
boolean foundSegmentNotAdheringToPlacementPolicy = false;
boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
for (Map.Entry<Long, ? extends List<BookieId>> ensemble : metadata
.getAllEnsembles().entrySet()) {
long startEntryIdOfSegment = ensemble.getKey();
List<BookieId> ensembleOfSegment = ensemble.getValue();
EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin
.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
} else if (segmentAdheringToPlacementPolicy
== EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) {
foundSegmentSoftlyAdheringToPlacementPolicy = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {}"
+ " having writeQuorumSize: {} and ackQuorumSize: {} is"
+ " softly adhering to EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
}
boolean foundSegmentNotAdheringToPlacementPolicy = false;
boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
int ensembleIndex = 0;
int ensembleSize = metadata.getAllEnsembles().size();
for (Map.Entry<Long, ? extends List<BookieId>> ensemble : metadata
.getAllEnsembles().entrySet()) {
if (++ensembleIndex == ensembleSize && !metadata.isClosed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} last ensemble is not yet closed, so skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
break;
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
//If user enable repaired, mark this ledger to under replication manager.
if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
Collections.emptyList()).whenComplete((res, e) -> {
if (e != null) {
LOG.error("For ledger: {}, the placement policy not adhering bookie "
+ "storage, mark it to under replication manager failed.",
ledgerId, e);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+ " storage, mark it to under replication manager", ledgerId);
}
});
long startEntryIdOfSegment = ensemble.getKey();
List<BookieId> ensembleOfSegment = ensemble.getValue();
EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin
.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
} else if (segmentAdheringToPlacementPolicy
== EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) {
foundSegmentSoftlyAdheringToPlacementPolicy = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {}"
+ " having writeQuorumSize: {} and ackQuorumSize: {} is"
+ " softly adhering to EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
}
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck
.incrementAndGet();
}
numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
//If user enable repaired, mark this ledger to under replication manager.
if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
Collections.emptyList()).whenComplete((res, e) -> {
if (e != null) {
LOG.error("For ledger: {}, the placement policy not adhering bookie "
+ "storage, mark it to under replication manager failed.",
ledgerId, e);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+ " storage, mark it to under replication manager", ledgerId);
}
});
}
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck
.incrementAndGet();
}
numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();

iterCallback.processResult(BKException.Code.OK, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,6 @@ private Set<LedgerFragment> getNeedRepairedPlacementNotAdheringFragments(LedgerH
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (!metadata.isClosed()) {
return;
}
Long curEntryId = null;
EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null;

Expand All @@ -409,7 +406,7 @@ private Set<LedgerFragment> getNeedRepairedPlacementNotAdheringFragments(LedgerH
writeQuorumSize, ackQuorumSize);
curEntryId = entry.getKey();
}
if (curEntryId != null) {
if (curEntryId != null && metadata.isClosed()) {
if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
== previousSegmentAdheringToPlacementPolicy) {
long lastEntry = lh.getLedgerMetadata().getLastEntryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,12 +1239,22 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
//This ledger not adhering placement policy, the combine(0,1,2) rack is 1.
LedgerHandle lh = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);

//This ledger not adhering placement policy, the combine(0,1,2) rack is 1 and this ledger is not closed.
LedgerHandle lh2 = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);

int entrySize = 10;
for (int i = 0; i < entrySize; i++) {
lh.addEntry(data);
lh2.addEntry(data);
}
lh.close();

LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
LedgerMetadata metadata2 = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue();

assertTrue(metadata.isClosed());
assertFalse(metadata2.isClosed());

int minNumRacksPerWriteQuorumConfValue = 2;

ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
Expand Down Expand Up @@ -1276,6 +1286,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
assertNotNull(stat);

baseConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true);
baseConf.setOpenLedgerRereplicationGracePeriod("1000");
BookKeeper bookKeeper = new BookKeeperTestClient(baseClientConf) {
@Override
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
Expand Down Expand Up @@ -1310,9 +1321,15 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
}

Awaitility.await().untilAsserted(() -> {
LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
List<BookieId> newBookies = metadata.getAllEnsembles().get(0L);
LedgerMetadata lhMetadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
List<BookieId> newBookies = lhMetadata.getAllEnsembles().get(0L);
assertTrue(newBookies.contains(newBookieId));
assertTrue(lhMetadata.isClosed());

LedgerMetadata lh2Metadata = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue();
List<BookieId> newBookies2 = lh2Metadata.getAllEnsembles().get(0L);
assertTrue(!newBookies2.contains(newBookieId));
assertTrue(!lh2Metadata.isClosed());
});

Awaitility.await().untilAsserted(() -> {
Expand Down