Skip to content

Commit 59c4b41

Browse files
BenHuddlestondaverigby
authored andcommitted
Copy chain Positions between first and second chain
If we do not do this then the nodeWriteSeqno of a replica that existed and acked in the secondChain then was moved to the first will be incorrect until it acks again. This bug is benign as we use the ack count to determine if we should commit, not the nodeWriteSeqno so we do not fail to commit anything. No monotonic invariant exceptions are thrown as a new Position is created in the second chain which effectively resets the monotonic nodeWriteSeqno value. Change-Id: I881d4c66ed590169830656b1ca7979de37165967 Reviewed-on: http://review.couchbase.org/112844 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent be3f716 commit 59c4b41

File tree

2 files changed

+95
-13
lines changed

2 files changed

+95
-13
lines changed

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ struct ActiveDurabilityMonitor::State {
7777
void setReplicationTopology(const nlohmann::json& topology,
7878
CompletedQueue& toComplete);
7979

80+
/**
81+
* Copy the Positions from the old chain to the new
82+
*/
83+
void copyChainPositions(ReplicationChain& oldChain,
84+
ReplicationChain& newChain);
85+
8086
void addSyncWrite(const void* cookie, queued_item item);
8187

8288
/**
@@ -1323,20 +1329,20 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
13231329
// Copy over the trackedWrites position for all nodes which still exist
13241330
// in the new chain. This ensures that if we manually set the HPS on the
13251331
// firstChain then the secondChain will also be correctly set.
1326-
for (const auto& node : firstChain->positions) {
1327-
auto newNode = newFirstChain->positions.find(node.first);
1328-
if (newNode != newFirstChain->positions.end()) {
1329-
newNode->second = node.second;
1330-
}
1332+
copyChainPositions(*firstChain, *newFirstChain);
1333+
if (newSecondChain) {
1334+
// This stage should never matter because we will find the node in
1335+
// the firstChain and return early from processSeqnoAck. Added for
1336+
// the sake of completeness.
1337+
// @TODO make iterators optional and remove this
1338+
copyChainPositions(*firstChain, *newSecondChain);
13311339
}
13321340
}
13331341

1334-
if (secondChain && newSecondChain) {
1335-
for (const auto& node : secondChain->positions) {
1336-
auto newNode = newSecondChain->positions.find(node.first);
1337-
if (newNode != newSecondChain->positions.end()) {
1338-
newNode->second = node.second;
1339-
}
1342+
if (secondChain) {
1343+
copyChainPositions(*secondChain, *newFirstChain);
1344+
if (newSecondChain) {
1345+
copyChainPositions(*secondChain, *newSecondChain);
13401346
}
13411347
}
13421348

@@ -1383,6 +1389,16 @@ void ActiveDurabilityMonitor::State::setReplicationTopology(
13831389
cleanUpTrackedWritesPostTopologyChange(toComplete);
13841390
}
13851391

1392+
void ActiveDurabilityMonitor::State::copyChainPositions(
1393+
ReplicationChain& oldChain, ReplicationChain& newChain) {
1394+
for (const auto& node : oldChain.positions) {
1395+
auto newNode = newChain.positions.find(node.first);
1396+
if (newNode != newChain.positions.end()) {
1397+
newNode->second = node.second;
1398+
}
1399+
}
1400+
}
1401+
13861402
void ActiveDurabilityMonitor::State::performQueuedAckForChain(
13871403
const DurabilityMonitor::ReplicationChain& chain,
13881404
CompletedQueue& toCommit) {

engines/ep/tests/module_tests/vbucket_durability_test.cc

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1488,8 +1488,9 @@ void VBucketDurabilityTest::testConvertPDMToADMWithNullTopology(
14881488
testConvertPDMToADMWithNullTopologySetup(initialState, writes);
14891489

14901490
// ns_server then sets the topology
1491-
vbucket->setState(vbucket_state_active,
1492-
{{"topology", nlohmann::json::array({{active}})}});
1491+
vbucket->setState(
1492+
vbucket_state_active,
1493+
{{"topology", nlohmann::json::array({{active}, {active}})}});
14931494

14941495
auto& adm = VBucketTestIntrospector::public_getActiveDM(*vbucket);
14951496

@@ -2130,6 +2131,71 @@ TEST_P(VBucketDurabilityTest, ActiveDM_DoubleSetVBState) {
21302131
}
21312132
}
21322133

2134+
TEST_P(EPVBucketDurabilityTest,
2135+
ActiveDM_SecondChainNodeWriteSeqnoMaintainedOntopologyChange) {
2136+
// Set a topology with 2 chains - i.e. approaching the end of a replica swap
2137+
// rebalance
2138+
nlohmann::json topology({{"topology",
2139+
nlohmann::json::array({{active, replica1},
2140+
{active, replica2}})}});
2141+
vbucket->setState(vbucket_state_active, topology);
2142+
2143+
// Setup: queue two Prepares into the ADM
2144+
auto& adm = VBucketTestIntrospector::public_getActiveDM(*vbucket);
2145+
ASSERT_EQ(0, adm.getNumTracked());
2146+
using namespace cb::durability;
2147+
const std::vector<SyncWriteSpec> seqnos{
2148+
{1, false, Level::PersistToMajority}, 2};
2149+
testAddPrepare(seqnos);
2150+
ASSERT_EQ(seqnos.size(), adm.getNumTracked());
2151+
// checkForCommit will be called after every normal vBucket op and will
2152+
// set the HPS for us
2153+
adm.checkForCommit();
2154+
2155+
// Prepare at seqno 1 requires persistence so we have not moved HPS
2156+
ASSERT_EQ(0, adm.getHighPreparedSeqno());
2157+
ASSERT_EQ(2, adm.getNumTracked());
2158+
2159+
// We have acked replica2
2160+
adm.seqnoAckReceived(replica2, 2);
2161+
ASSERT_EQ(2, adm.getNodeWriteSeqno(replica2));
2162+
ASSERT_EQ(2, adm.getNumTracked());
2163+
2164+
// Complete the "rebalance" by settings the topology to a single chain with
2165+
// the new replica
2166+
topology = nlohmann::json(
2167+
{{"topology", nlohmann::json::array({{active, replica2}})}});
2168+
vbucket->setState(vbucket_state_active, topology);
2169+
2170+
// Nothing has yet been committed because the active must persist
2171+
EXPECT_EQ(2, adm.getNumTracked());
2172+
EXPECT_EQ(0, adm.getHighPreparedSeqno());
2173+
2174+
// The node write seqno is transferred in the topology change
2175+
EXPECT_EQ(2, adm.getNodeWriteSeqno(replica2));
2176+
2177+
// Notify persistence and commit our prepares
2178+
vbucket->setPersistenceSeqno(2);
2179+
adm.notifyLocalPersistence();
2180+
EXPECT_EQ(0, adm.getNumTracked());
2181+
2182+
// Client should be notified.
2183+
EXPECT_EQ(SWCompleteTrace(2 /*count*/, cookie, ENGINE_SUCCESS),
2184+
swCompleteTrace);
2185+
2186+
// After commit() check that the keys are now accessible and appear as
2187+
// committed.
2188+
for (const auto& spec : seqnos) {
2189+
auto key = makeStoredDocKey("key"s + std::to_string(spec.seqno));
2190+
auto result = vbucket->fetchValidValue(WantsDeleted::No,
2191+
TrackReference::No,
2192+
QueueExpired::No,
2193+
vbucket->lockCollections(key));
2194+
ASSERT_TRUE(result.storedValue);
2195+
EXPECT_TRUE(result.storedValue->isCommitted());
2196+
}
2197+
}
2198+
21332199
TEST_P(VBucketDurabilityTest, IgnoreAckAtTakeoverDead) {
21342200
// Queue some Prepares into the PDM
21352201
const auto& adm = VBucketTestIntrospector::public_getActiveDM(*vbucket);

0 commit comments

Comments
 (0)