Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,66 +257,66 @@ void doPlacementPolicyCheck(Long ledgerId,
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (metadata.isClosed()) {
boolean foundSegmentNotAdheringToPlacementPolicy = false;
boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
for (Map.Entry<Long, ? extends List<BookieId>> ensemble : metadata
.getAllEnsembles().entrySet()) {
long startEntryIdOfSegment = ensemble.getKey();
List<BookieId> ensembleOfSegment = ensemble.getValue();
EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin
.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
if (!metadata.isClosed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better skipping check the last segment if the ledger is OPEN, otherwise, the replication worker will fence a lot of ledgers can lead to client recreate new ledgers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

If there is a rack failure and enforceMinNumRacksPerWriteQuorum=false, the open segment will never satisfy the rack distribution, and the auditor will cause all open ledger fences in the cluster. We can try to repair the closed ledger or segment, but for the open segment, the frequent fence ledgers can be a bit bad when rack failures.

We should be consistent with the behavior of the bookie client write side. Especially for open segments.

if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, but do not skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
}
boolean foundSegmentNotAdheringToPlacementPolicy = false;
boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
for (Map.Entry<Long, ? extends List<BookieId>> ensemble : metadata
.getAllEnsembles().entrySet()) {
long startEntryIdOfSegment = ensemble.getKey();
List<BookieId> ensembleOfSegment = ensemble.getValue();
EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin
.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
} else if (segmentAdheringToPlacementPolicy
== EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) {
foundSegmentSoftlyAdheringToPlacementPolicy = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {}"
+ " having writeQuorumSize: {} and ackQuorumSize: {} is"
+ " softly adhering to EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
} else if (segmentAdheringToPlacementPolicy
== EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) {
foundSegmentSoftlyAdheringToPlacementPolicy = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {}"
+ " having writeQuorumSize: {} and ackQuorumSize: {} is"
+ " softly adhering to EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
}
}
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
//If user enable repaired, mark this ledger to under replication manager.
if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
Collections.emptyList()).whenComplete((res, e) -> {
if (e != null) {
LOG.error("For ledger: {}, the placement policy not adhering bookie "
+ "storage, mark it to under replication manager failed.",
ledgerId, e);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+ " storage, mark it to under replication manager", ledgerId);
}
});
}
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck
.incrementAndGet();
}
numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
//If user enable repaired, mark this ledger to under replication manager.
if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
Collections.emptyList()).whenComplete((res, e) -> {
if (e != null) {
LOG.error("For ledger: {}, the placement policy not adhering bookie "
+ "storage, mark it to under replication manager failed.",
ledgerId, e);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+ " storage, mark it to under replication manager", ledgerId);
}
});
}
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck
.incrementAndGet();
}
numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();

iterCallback.processResult(BKException.Code.OK, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,6 @@ private Set<LedgerFragment> getNeedRepairedPlacementNotAdheringFragments(LedgerH
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (!metadata.isClosed()) {
return;
}
Long curEntryId = null;
EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,12 +1239,22 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
//This ledger not adhering placement policy, the combine(0,1,2) rack is 1.
LedgerHandle lh = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);

//This ledger not adhering placement policy, the combine(0,1,2) rack is 1 and this ledger is not closed.
LedgerHandle lh2 = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);

int entrySize = 10;
for (int i = 0; i < entrySize; i++) {
lh.addEntry(data);
lh2.addEntry(data);
}
lh.close();

LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
LedgerMetadata metadata2 = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue();

assertTrue(metadata.isClosed());
assertFalse(metadata2.isClosed());

int minNumRacksPerWriteQuorumConfValue = 2;

ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
Expand All @@ -1259,7 +1269,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
1, ledgersNotAdheringToPlacementPolicyGuage.getSample());
2, ledgersNotAdheringToPlacementPolicyGuage.getSample());
Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
Expand All @@ -1276,6 +1286,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
assertNotNull(stat);

baseConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true);
baseConf.setOpenLedgerRereplicationGracePeriod("1000");
BookKeeper bookKeeper = new BookKeeperTestClient(baseClientConf) {
@Override
protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
Expand Down Expand Up @@ -1310,9 +1321,15 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig
}

Awaitility.await().untilAsserted(() -> {
LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
List<BookieId> newBookies = metadata.getAllEnsembles().get(0L);
LedgerMetadata lhMetadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
List<BookieId> newBookies = lhMetadata.getAllEnsembles().get(0L);
assertTrue(newBookies.contains(newBookieId));
assertTrue(lhMetadata.isClosed());

LedgerMetadata lh2Metadata = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue();
List<BookieId> newBookies2 = lh2Metadata.getAllEnsembles().get(0L);
assertTrue(newBookies2.contains(newBookieId));
assertTrue(lh2Metadata.isClosed());
});

Awaitility.await().untilAsserted(() -> {
Expand Down