Skip to content

Commit c8ee0a8

Browse files
committed
Refactor: Split ADM::State::setReplicationTopology
The function is long and is logically a series of separate steps that have to be completed in the correct order. Split the function into many for each step and comment where it's required that we perform one step before another. Change-Id: I960a3f36e0d975dfd0f65b3dc138260a11ca3c54 Reviewed-on: http://review.couchbase.org/112851 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 22e2e4d commit c8ee0a8

File tree

1 file changed

+167
-80
lines changed

1 file changed

+167
-80
lines changed

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 167 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ struct ActiveDurabilityMonitor::State {
7878
CompletedQueue& toComplete);
7979

8080
/**
81-
* Copy the Positions from the old chain to the new
81+
* Add a new SyncWrite
82+
*
83+
* @param cookie Connection to notify on completion
84+
* @param item The prepare
8285
*/
83-
void copyChainPositions(ReplicationChain& oldChain,
84-
ReplicationChain& newChain);
85-
8686
void addSyncWrite(const void* cookie, queued_item item);
8787

8888
/**
@@ -192,6 +192,72 @@ struct ActiveDurabilityMonitor::State {
192192
*/
193193
void updateHighPreparedSeqno(CompletedQueue& completed);
194194

195+
void updateHighCompletedSeqno();
196+
197+
protected:
198+
/**
199+
* Set up the newFirstChain correctly if we previously had no topology.
200+
*
201+
* When a replica is promoted to active, the trackedWrites are moved from
202+
* the PDM to the ADM. This ADM will have a null topology and the active
203+
* node iterator will not exist. When we move from a null topology to a
204+
* topology, we need to correct the HPS iterator to ensure that the HPS is
205+
* correct post topology change. The HPS iterator is set to the
206+
* corresponding SyncWrite in trackedWrites. If we have just received a Disk
207+
* snapshot as PDM and highPreparedSeqno is not equal to anything in
208+
* trackedWrites then it will be set to the highest seqno less than the
209+
* highPreparedSeqno.
210+
*
211+
* @param newFirstChain our new firstChain
212+
*/
213+
void transitionFromNullTopology(ReplicationChain& newFirstChain);
214+
215+
/**
216+
* Move the Positions (iterators and write/ack seqnos) from the old chains
217+
* to the new chains. Copies between two chains too so that a node that
218+
* previously existed int he second chain and now only exists in the first
219+
* will have the correct iterators and seqnos (this is a normal swap
220+
* rebalance scenario).
221+
*
222+
* @param firstChain old first chain
223+
* @param newFirstChain new first chain
224+
* @param secondChain old second chain
225+
* @param newSecondChain new second chain
226+
*/
227+
void copyChainPositions(ReplicationChain* firstChain,
228+
ReplicationChain& newFirstChain,
229+
ReplicationChain* secondChain,
230+
ReplicationChain* newSecondChain);
231+
232+
/**
233+
* Copy the Positions from the given old chain to the given new chain.
234+
*/
235+
void copyChainPositionsInner(ReplicationChain& oldChain,
236+
ReplicationChain& newChain);
237+
238+
/**
239+
* A topology change may make durable writes impossible in the case of a
240+
* failover. We abort the in-flight SyncWrites to allow the client to retry
241+
* them to provide an earlier response. This retry would then be met with a
242+
* durability impossible error if the cluster had not yet been healed.
243+
*
244+
* Note, we cannot abort any in-flight SyncWrites with an infinite
245+
* timeout as these must be committed. They will eventually be committed
246+
* when the cluster is healed or the number of replicas is dropped such that
247+
* they are satisfied. These SyncWrites may exist due to either warmup or
248+
* a Passive->Active transition.
249+
*
250+
* @param newFirstChain Chain against which we check if durability is
251+
* possible
252+
* @param newSecondChain Chain against which we check if durability is
253+
* possible
254+
* @param[out] toAbort Container which has all SyncWrites to abort appended
255+
* to it
256+
*/
257+
void abortNoLongerPossibleSyncWrites(ReplicationChain& newFirstChain,
258+
ReplicationChain* newSecondChain,
259+
CompletedQueue& toAbort);
260+
195261
/**
196262
* Perform the manual ack (from the map of queuedSeqnoAcks) that is
197263
* required at rebalance for the given chain
@@ -203,8 +269,6 @@ struct ActiveDurabilityMonitor::State {
203269
void performQueuedAckForChain(const ReplicationChain& chain,
204270
CompletedQueue& toCommit);
205271

206-
void updateHighCompletedSeqno();
207-
208272
/**
209273
* A topology change may trigger a commit due to number of replicas
210274
* changing. Generally we commit by moving the HPS or receiving a seqno ack
@@ -1278,82 +1342,131 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
12781342
auto newFirstChain =
12791343
makeChain(DurabilityMonitor::ReplicationChainName::First, fChain);
12801344

1281-
// Apply the new topology to all in-flight SyncWrites
1345+
// Apply the new topology to all in-flight SyncWrites.
12821346
for (auto& write : trackedWrites) {
12831347
write.resetTopology(*newFirstChain, newSecondChain.get());
12841348
}
12851349

1286-
if (!firstChain && newFirstChain) {
1287-
// MB-35275. When a replica is promoted to active, the trackedWrites are
1288-
// moved from the PDM to the ADM. This ADM will have a null topology and
1289-
// the active node iterator will not exist. When we move from a null
1290-
// topology to a topology, we need to correct the HPS iterator to ensure
1291-
// that the HPS is correct post topology change. The HPS iterator is set
1292-
// to the corresponding SyncWrite in trackedWrites. If we have just
1293-
// received a Disk snapshot as PDM and highPreparedSeqno is not equal to
1294-
// anything in trackedWrites then it will be set to the highest seqno
1295-
// less than the highPreparedSeqno.
1296-
if (!trackedWrites.empty()) {
1297-
// We need to manually set the values for the HPS iterator
1298-
// (newFirstChain->positions.begin()) and "ack" the nodes so that we
1299-
// can commit if possible by checking if they are satisfied.
1300-
1301-
// It may be the case that we had a PersistToMajority prepare in the
1302-
// PDM before moving to ADM that had not yet been persisted
1303-
// (trackedWrites.back().getBySeqno() != highPreparedSeqno). If we
1304-
// have persisted this prepare in between transitioning from PDM
1305-
// to ADM with null topology and transitioning from ADM with null
1306-
// topology to ADM with topology then we may need to move our HPS
1307-
// further than the highPreparedSeqno that we inherited from the PDM
1308-
// due to persistence.
1309-
auto fence = std::max(static_cast<uint64_t>(highPreparedSeqno),
1310-
adm.vb.getPersistenceSeqno());
1311-
auto& activePos =
1312-
newFirstChain->positions.find(newFirstChain->active)
1313-
->second;
1314-
Container::iterator it = trackedWrites.begin();
1315-
while (it != trackedWrites.end()) {
1316-
if (it->getBySeqno() <= static_cast<int64_t>(fence)) {
1317-
activePos.it = it;
1318-
it->ack(newFirstChain->active);
1319-
it = std::next(it);
1320-
} else {
1321-
break;
1322-
}
1323-
}
1350+
// Set the HPS correctly if we are transitioning from a null topology (may
1351+
// be in-flight SyncWrites from a PDM that we use to do this). Must be done
1352+
// after we have have set the topology of the SyncWrites or they will have
1353+
// no chain.
1354+
if (!firstChain) {
1355+
transitionFromNullTopology(*newFirstChain);
1356+
}
1357+
1358+
// Copy the iterators from the old chains to the new chains.
1359+
copyChainPositions(firstChain.get(),
1360+
*newFirstChain,
1361+
secondChain.get(),
1362+
newSecondChain.get());
1363+
1364+
// We have already reset the topology of the in flight SyncWrites so that
1365+
// they do not contain any invalid pointers to ReplicationChains post
1366+
// topology change.
1367+
abortNoLongerPossibleSyncWrites(
1368+
*newFirstChain, newSecondChain.get(), toComplete);
1369+
1370+
// We have now reset all the topology for SyncWrites so we can dispose of
1371+
// the old chain (by overwriting it with the new one).
1372+
firstChain = std::move(newFirstChain);
1373+
secondChain = std::move(newSecondChain);
1374+
1375+
// Manually ack any nodes that did not previously exist in either chain
1376+
performQueuedAckForChain(*firstChain, toComplete);
13241377

1325-
activePos.lastWriteSeqno = fence;
1326-
highPreparedSeqno = fence;
1378+
if (secondChain) {
1379+
performQueuedAckForChain(*secondChain, toComplete);
1380+
}
1381+
1382+
// Commit if possible
1383+
cleanUpTrackedWritesPostTopologyChange(toComplete);
1384+
}
1385+
1386+
void ActiveDurabilityMonitor::State::transitionFromNullTopology(
1387+
ReplicationChain& newFirstChain) {
1388+
if (!trackedWrites.empty()) {
1389+
// We need to manually set the values for the HPS iterator
1390+
// (newFirstChain->positions.begin()) and "ack" the nodes so that we
1391+
// can commit if possible by checking if they are satisfied.
1392+
1393+
// It may be the case that we had a PersistToMajority prepare in the
1394+
// PDM before moving to ADM that had not yet been persisted
1395+
// (trackedWrites.back().getBySeqno() != highPreparedSeqno). If we
1396+
// have persisted this prepare in between transitioning from PDM
1397+
// to ADM with null topology and transitioning from ADM with null
1398+
// topology to ADM with topology then we may need to move our HPS
1399+
// further than the highPreparedSeqno that we inherited from the PDM
1400+
// due to persistence.
1401+
auto fence = std::max(static_cast<uint64_t>(highPreparedSeqno),
1402+
adm.vb.getPersistenceSeqno());
1403+
auto& activePos =
1404+
newFirstChain.positions.find(newFirstChain.active)->second;
1405+
Container::iterator it = trackedWrites.begin();
1406+
while (it != trackedWrites.end()) {
1407+
if (it->getBySeqno() <= static_cast<int64_t>(fence)) {
1408+
activePos.it = it;
1409+
it->ack(newFirstChain.active);
1410+
it = std::next(it);
1411+
} else {
1412+
break;
1413+
}
13271414
}
1328-
} else if (firstChain) {
1415+
1416+
activePos.lastWriteSeqno = fence;
1417+
highPreparedSeqno = fence;
1418+
}
1419+
}
1420+
1421+
void ActiveDurabilityMonitor::State::copyChainPositions(
1422+
ReplicationChain* firstChain,
1423+
ReplicationChain& newFirstChain,
1424+
ReplicationChain* secondChain,
1425+
ReplicationChain* newSecondChain) {
1426+
if (firstChain) {
13291427
// Copy over the trackedWrites position for all nodes which still exist
13301428
// in the new chain. This ensures that if we manually set the HPS on the
13311429
// firstChain then the secondChain will also be correctly set.
1332-
copyChainPositions(*firstChain, *newFirstChain);
1430+
copyChainPositionsInner(*firstChain, newFirstChain);
13331431
if (newSecondChain) {
13341432
// This stage should never matter because we will find the node in
13351433
// the firstChain and return early from processSeqnoAck. Added for
13361434
// the sake of completeness.
13371435
// @TODO make iterators optional and remove this
1338-
copyChainPositions(*firstChain, *newSecondChain);
1436+
copyChainPositionsInner(*firstChain, *newSecondChain);
13391437
}
13401438
}
13411439

13421440
if (secondChain) {
1343-
copyChainPositions(*secondChain, *newFirstChain);
1441+
copyChainPositionsInner(*secondChain, newFirstChain);
13441442
if (newSecondChain) {
1345-
copyChainPositions(*secondChain, *newSecondChain);
1443+
copyChainPositionsInner(*secondChain, *newSecondChain);
1444+
}
1445+
}
1446+
}
1447+
1448+
void ActiveDurabilityMonitor::State::copyChainPositionsInner(
1449+
ReplicationChain& oldChain, ReplicationChain& newChain) {
1450+
for (const auto& node : oldChain.positions) {
1451+
auto newNode = newChain.positions.find(node.first);
1452+
if (newNode != newChain.positions.end()) {
1453+
newNode->second = node.second;
13461454
}
13471455
}
1456+
}
13481457

1458+
void ActiveDurabilityMonitor::State::abortNoLongerPossibleSyncWrites(
1459+
ReplicationChain& newFirstChain,
1460+
ReplicationChain* newSecondChain,
1461+
CompletedQueue& toAbort) {
13491462
// If durability is not possible for the new chains, then we should abort
13501463
// any in-flight SyncWrites that do not have an infinite timeout so that the
13511464
// client can decide what to do. We do not abort and infinite timeout
13521465
// SyncWrites as we MUST complete them as they exist due to a warmup or
13531466
// Passive->Active transition. We have already reset the topology of the in
13541467
// flight SyncWrites so that they do not contain any invalid pointers post
13551468
// topology change.
1356-
if (!(newFirstChain && newFirstChain->isDurabilityPossible() &&
1469+
if (!(newFirstChain.isDurabilityPossible() &&
13571470
(!newSecondChain || newSecondChain->isDurabilityPossible()))) {
13581471
// We can't use a for loop with iterators here because they will be
13591472
// modified to point to invalid memory as we use std::list.splice in
@@ -1364,39 +1477,13 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
13641477
// Grab the next itr before we overwrite ours to point to a
13651478
// different list.
13661479
auto next = std::next(itr);
1367-
toComplete.enqueue(*this,
1368-
removeSyncWrite(trackedWrites.begin()));
1480+
toAbort.enqueue(*this, removeSyncWrite(trackedWrites.begin()));
13691481
itr = next;
13701482
} else {
13711483
itr++;
13721484
}
13731485
}
13741486
}
1375-
1376-
// We have now reset all the topology for SyncWrites so we can dispose of
1377-
// the old chain (by overwriting it with the new one).
1378-
firstChain = std::move(newFirstChain);
1379-
secondChain = std::move(newSecondChain);
1380-
1381-
// Manually ack any nodes that did not previously exist in either chain
1382-
performQueuedAckForChain(*firstChain, toComplete);
1383-
1384-
if (secondChain) {
1385-
performQueuedAckForChain(*secondChain, toComplete);
1386-
}
1387-
1388-
// Commit if possible
1389-
cleanUpTrackedWritesPostTopologyChange(toComplete);
1390-
}
1391-
1392-
void ActiveDurabilityMonitor::State::copyChainPositions(
1393-
ReplicationChain& oldChain, ReplicationChain& newChain) {
1394-
for (const auto& node : oldChain.positions) {
1395-
auto newNode = newChain.positions.find(node.first);
1396-
if (newNode != newChain.positions.end()) {
1397-
newNode->second = node.second;
1398-
}
1399-
}
14001487
}
14011488

14021489
void ActiveDurabilityMonitor::State::performQueuedAckForChain(

0 commit comments

Comments
 (0)