Skip to content

Commit 5191f1c

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35652: Remove race between resolvedQ and setReplicationTopology
Currently we have a race between processing the resolvedQueue and setting the replication topology. This happens because processing the resolvedQueue requires use of the ReplicationChain pointers in each SyncWrite object. We use the ReplicationChain to see if a SyncWrite is satisfied when processing the resolvedQueue. We do not update the pointers to the new ReplicationChains for the SyncWrites in the resolvedQueue. This means we could attempt to use a freed pointer when processing the resolvedQueue. This problem existed before the processing of the queue was moved to a separate task, but was exacerbated by it due to timings. Fix this by setting a status in each SyncWrite when we know how to completed it and using the status instead of the chain pointers. We can then invalidate the pointers when removing the SyncWrites from trackedWrites. Change-Id: I021e080d93d10d9ec7c286e24824feb08d80cc58 Reviewed-on: http://review.couchbase.org/113643 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 83f4ab4 commit 5191f1c

File tree

5 files changed

+99
-24
lines changed

5 files changed

+99
-24
lines changed

engines/ep/src/durability/active_durability_monitor.cc

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,10 @@ struct ActiveDurabilityMonitor::State {
157157
* Remove the given SyncWrte from tracking.
158158
*
159159
* @param it The iterator to the SyncWrite to be removed
160+
* @param status The SyncWriteStatus to set the SyncWrite to as we remove it
160161
* @return the removed SyncWrite.
161162
*/
162-
SyncWrite removeSyncWrite(Container::iterator it);
163+
SyncWrite removeSyncWrite(Container::iterator it, SyncWriteStatus status);
163164

164165
/**
165166
* Logically 'moves' forward the High Prepared Seqno to the last
@@ -732,11 +733,22 @@ void ActiveDurabilityMonitor::processCompletedSyncWriteQueue() {
732733
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
733734
resolvedQueue->getConsumerLock());
734735
while (folly::Optional<SyncWrite> sw = resolvedQueue->try_dequeue(lock)) {
735-
if (sw->isSatisfied()) {
736+
switch (sw->getStatus()) {
737+
case SyncWriteStatus::Pending:
738+
case SyncWriteStatus::Completed:
739+
throw std::logic_error(
740+
"ActiveDurabilityMonitor::processCompletedSyncWriteQueue "
741+
"found a SyncWrite with unexpected state: " +
742+
to_string(sw->getStatus()));
743+
continue;
744+
case SyncWriteStatus::ToCommit:
736745
commit(*sw);
737-
} else {
746+
continue;
747+
case SyncWriteStatus::ToAbort:
738748
abort(*sw);
749+
continue;
739750
}
751+
folly::assume_unreachable();
740752
};
741753
}
742754

@@ -1001,11 +1013,17 @@ int64_t ActiveDurabilityMonitor::State::getNodeAckSeqno(
10011013
}
10021014

10031015
DurabilityMonitor::SyncWrite ActiveDurabilityMonitor::State::removeSyncWrite(
1004-
Container::iterator it) {
1016+
Container::iterator it, SyncWriteStatus status) {
10051017
if (it == trackedWrites.end()) {
10061018
throwException<std::logic_error>(__func__, "Position points to end");
10071019
}
10081020

1021+
it->setStatus(status);
1022+
// Reset the chains so that we don't attempt to use some possibly re-used
1023+
// memory if we have any bugs that still touch the chains after we remove
1024+
// the SyncWrite from trackedWrites.
1025+
it->initialiseChains(nullptr, nullptr);
1026+
10091027
Container::iterator prev;
10101028
// Note: iterators in trackedWrites are never singular, Container::end
10111029
// is used as placeholder element for when an iterator cannot point to
@@ -1149,7 +1167,8 @@ void ActiveDurabilityMonitor::State::processSeqnoAck(const std::string& node,
11491167

11501168
// Check if Durability Requirements satisfied now, and add for commit
11511169
if (posIt->isSatisfied()) {
1152-
toCommit.enqueue(*this, removeSyncWrite(posIt));
1170+
toCommit.enqueue(*this,
1171+
removeSyncWrite(posIt, SyncWriteStatus::ToCommit));
11531172
}
11541173
}
11551174

@@ -1175,7 +1194,8 @@ size_t ActiveDurabilityMonitor::wipeTracked() {
11751194
while (it != s->trackedWrites.end()) {
11761195
// Note: 'it' will be invalidated, so it will need to be reset
11771196
const auto next = std::next(it);
1178-
s->removeSyncWrite(it);
1197+
// Status does not matter, just nuking trackedWrites
1198+
s->removeSyncWrite(it, SyncWriteStatus::Pending);
11791199
removed++;
11801200
it = next;
11811201
}
@@ -1490,7 +1510,9 @@ void ActiveDurabilityMonitor::State::abortNoLongerPossibleSyncWrites(
14901510
// Grab the next itr before we overwrite ours to point to a
14911511
// different list.
14921512
auto next = std::next(itr);
1493-
toAbort.enqueue(*this, removeSyncWrite(trackedWrites.begin()));
1513+
toAbort.enqueue(*this,
1514+
removeSyncWrite(trackedWrites.begin(),
1515+
SyncWriteStatus::ToAbort));
14941516
itr = next;
14951517
} else {
14961518
itr++;
@@ -1524,9 +1546,10 @@ void ActiveDurabilityMonitor::State::cleanUpTrackedWritesPostTopologyChange(
15241546
// snapshot. We have to do this after we set the HPS otherwise we could
15251547
// end up with an ADM with lower HPS than the previous PDM.
15261548
if (it->isCompleted()) {
1527-
removeSyncWrite(it);
1549+
removeSyncWrite(it, SyncWriteStatus::Completed);
15281550
} else if (it->isSatisfied()) {
1529-
toCommit.enqueue(*this, removeSyncWrite(it));
1551+
toCommit.enqueue(*this,
1552+
removeSyncWrite(it, SyncWriteStatus::ToCommit));
15301553
}
15311554
it = next;
15321555
}
@@ -1556,7 +1579,8 @@ void ActiveDurabilityMonitor::State::removeExpired(
15561579
// Note: 'it' will be invalidated, so it will need to be reset
15571580
const auto next = std::next(it);
15581581

1559-
expired.enqueue(*this, removeSyncWrite(it));
1582+
expired.enqueue(*this,
1583+
removeSyncWrite(it, SyncWriteStatus::ToAbort));
15601584

15611585
it = next;
15621586
} else {
@@ -1621,7 +1645,8 @@ void ActiveDurabilityMonitor::State::updateHighPreparedSeqno(
16211645
const auto& pos = firstChain->positions.at(active);
16221646
Expects(pos.it != trackedWrites.end());
16231647
if (pos.it->isSatisfied()) {
1624-
completed.enqueue(*this, removeSyncWrite(pos.it));
1648+
completed.enqueue(
1649+
*this, removeSyncWrite(pos.it, SyncWriteStatus::ToCommit));
16251650
}
16261651
};
16271652

engines/ep/src/durability/durability_monitor_impl.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,17 @@ std::string to_string(
372372
ss << "}";
373373
return ss.str();
374374
}
375+
376+
std::string to_string(SyncWriteStatus status) {
377+
switch (status) {
378+
case SyncWriteStatus::Pending:
379+
return "Pending";
380+
case SyncWriteStatus::ToCommit:
381+
return "ToCommit";
382+
case SyncWriteStatus::ToAbort:
383+
return "ToAbort";
384+
case SyncWriteStatus::Completed:
385+
return "Completed";
386+
}
387+
folly::assume_unreachable();
388+
}

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,29 @@ class PassiveDurabilityMonitor;
3939
// topology.
4040
static const std::string UndefinedNode{};
4141

42+
/**
43+
* The status of an in-flight SyncWrite
44+
*/
45+
enum class SyncWriteStatus {
46+
// Still waiting for enough acks to commit or to timeout.
47+
Pending = 0,
48+
49+
// Should be committed, enough nodes have acked. Should not exist in
50+
// trackedWrites in this state.
51+
ToCommit,
52+
53+
// Should be aborted. Should not exist in trackedWrites in this state.
54+
ToAbort,
55+
56+
// A replica receiving a disk snapshot or a snapshot with a persist level
57+
// prepare may not remove the SyncWrite object from trackedWrites until
58+
// it has been persisted. This SyncWrite has been Completed but may still
59+
// exist in trackedWrites.
60+
Completed,
61+
};
62+
63+
std::string to_string(SyncWriteStatus status);
64+
4265
/**
4366
* Represents a tracked SyncWrite. It is mainly a wrapper around a pending
4467
* Prepare item.
@@ -151,25 +174,30 @@ class DurabilityMonitor::SyncWrite {
151174
return item;
152175
}
153176

154-
void setCompleted() {
155-
completed = true;
177+
void setStatus(SyncWriteStatus status) {
178+
this->status = status;
156179
}
157180

158181
/**
159182
* @return true if this SyncWrite has been logically completed
160183
*/
161184
bool isCompleted() const {
162-
return completed;
185+
return status == SyncWriteStatus::Completed;
186+
}
187+
188+
SyncWriteStatus getStatus() const {
189+
return status;
163190
}
164191

165-
private:
166192
/**
167193
* Performs sanity checks and initialise the replication chains
168194
* @param firstChain Pointer (may be null) to the first chain
169195
* @param secondChain Pointer (may be null) to second chain
170196
*/
171197
void initialiseChains(const ReplicationChain* firstChain,
172198
const ReplicationChain* secondChain);
199+
200+
private:
173201
/**
174202
* Calculate the ackCount for this SyncWrite using the given chain.
175203
*
@@ -226,14 +254,7 @@ class DurabilityMonitor::SyncWrite {
226254
/// Used for statistics (track how long SyncWrites take to complete).
227255
const std::chrono::steady_clock::time_point startTime;
228256

229-
/**
230-
* A replica receiving a disk snapshot or a snapshot with a persist level
231-
* prepare may not remove the SyncWrite object from trackedWrites until
232-
* it has been persisted. This field exists to distinguish prepares that
233-
* have been completed but still exist in trackedWrites from those that have
234-
* not yet been completed.
235-
*/
236-
bool completed = false;
257+
SyncWriteStatus status = SyncWriteStatus::Pending;
237258

238259
friend std::ostream& operator<<(std::ostream&, const SyncWrite&);
239260
};

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ void PassiveDurabilityMonitor::completeSyncWrite(
342342
// duplicates in trackedWrites in case it is not removed because it requires
343343
// persistence.
344344
Expects(!next->isCompleted());
345-
next->setCompleted();
345+
next->setStatus(SyncWriteStatus::Completed);
346346

347347
// HCS has moved, which could make some Prepare eligible for removal.
348348
s->checkForAndRemovePrepares();

engines/ep/tests/module_tests/durability_monitor_test.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,21 @@ TEST_P(ActiveDurabilityMonitorTest, SeqnoAckReceivedConcurrentDataRace) {
986986
EXPECT_EQ(makeStoredDocKey("key2"), items[1]->getKey());
987987
}
988988

989+
TEST_P(ActiveDurabilityMonitorTest,
990+
CommitTopologyWithSyncWriteInCompletedQueue) {
991+
auto& adm = getActiveDM();
992+
adm.setReplicationTopology(nlohmann::json({{active, replica1, replica2}}));
993+
DurabilityMonitorTest::addSyncWrites({1});
994+
995+
notifyPersistence(1, 1, 0);
996+
997+
setSeqnoAckReceivedPostProcessHook([this, &adm]() {
998+
adm.setReplicationTopology(nlohmann::json::array({{active, replica1}}));
999+
});
1000+
1001+
adm.seqnoAckReceived("replica1", 1);
1002+
}
1003+
9891004
// @todo: Refactor test suite and expand test cases
9901005
TEST_P(ActiveDurabilityMonitorPersistentTest,
9911006
SeqnoAckReceived_PersistToMajority) {

0 commit comments

Comments
 (0)