Skip to content

Commit d179ba0

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35367: Remove potential race in seqno acking
From code inspection, looks like we could enter either the PDM::notifySnapshotEndReceived or PDM::notifyLocalPersistence functions and acquire a hps value to ack back to the active. The locking ensures that we never attempt to ack the same value twice, but if the first thread were to reach the check if hps != prevHps and get deschdeduled then a second thread could make it through the lock and ack a high value before the first. This would cause the active to throw monotonic invariant broken exceptions. Change-Id: Ib8cab59f8adb999302594f50057f327540e607c9 Reviewed-on: http://review.couchbase.org/112662 Reviewed-by: James Harrison <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent a35b2f3 commit d179ba0

File tree

4 files changed

+100
-25
lines changed

4 files changed

+100
-25
lines changed

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -204,50 +204,57 @@ size_t PassiveDurabilityMonitor::getNumAborted() const {
204204
}
205205

206206
void PassiveDurabilityMonitor::notifySnapshotEndReceived(uint64_t snapEnd) {
207-
int64_t prevHps{0};
208-
int64_t hps{0};
209-
{
207+
{ // state locking scope
210208
auto s = state.wlock();
211209
s->receivedSnapshotEnds.push({int64_t(snapEnd),
212210
vb.isReceivingDiskSnapshot()
213211
? CheckpointType::Disk
214212
: CheckpointType::Memory});
215213
// Maybe the new tracked Prepare is already satisfied and could be
216214
// ack'ed back to the Active.
217-
prevHps = s->highPreparedSeqno.lastWriteSeqno;
215+
auto prevHps = s->highPreparedSeqno.lastWriteSeqno;
218216
s->updateHighPreparedSeqno();
219-
hps = s->highPreparedSeqno.lastWriteSeqno;
217+
218+
// Store the seqno ack to send after we drop the state lock
219+
storeSeqnoAck(prevHps, s->highPreparedSeqno.lastWriteSeqno);
220220
}
221221

222-
// HPS may have not changed (e.g., a locally-non-satisfied PersistToMajority
223-
// Prepare has introduced a durability-fence), which would result in
224-
// re-acking the same HPS multiple times. Not wrong as HPS is weakly
225-
// monotonic at Active, but we want to avoid sending unnecessary messages.
226-
if (hps != prevHps) {
227-
Expects(hps > prevHps);
228-
vb.sendSeqnoAck(hps);
222+
if (notifySnapEndSeqnoAckPreProcessHook) {
223+
notifySnapEndSeqnoAckPreProcessHook();
229224
}
225+
226+
sendSeqnoAck();
230227
}
231228

232229
void PassiveDurabilityMonitor::notifyLocalPersistence() {
233-
int64_t prevHps{0};
234-
int64_t hps{0};
235-
{
230+
{ // state locking scope
236231
auto s = state.wlock();
237-
prevHps = s->highPreparedSeqno.lastWriteSeqno;
232+
auto prevHps = s->highPreparedSeqno.lastWriteSeqno;
238233
s->updateHighPreparedSeqno();
239-
hps = s->highPreparedSeqno.lastWriteSeqno;
234+
235+
// Store the seqno ack to send after we drop the state lock
236+
storeSeqnoAck(prevHps, s->highPreparedSeqno.lastWriteSeqno);
240237
}
241238

242-
// HPS may have not changed (e.g., we have just persisted a Majority Prepare
243-
// for which the HPS has been already increased at ADM::addSyncWrite), which
244-
// would result in re-acking the same HPS multiple times. Not wrong as HPS
245-
// is weakly monotonic at Active, but we want to avoid sending unnecessary
246-
// messages.
247-
if (hps != prevHps) {
248-
Expects(hps > prevHps);
249-
vb.sendSeqnoAck(hps);
239+
sendSeqnoAck();
240+
}
241+
242+
void PassiveDurabilityMonitor::storeSeqnoAck(int64_t prevHps, int64_t newHps) {
243+
if (prevHps != newHps) {
244+
auto seqno = seqnoToAck.wlock();
245+
if (*seqno < newHps) {
246+
*seqno = newHps;
247+
}
248+
}
249+
}
250+
251+
void PassiveDurabilityMonitor::sendSeqnoAck() {
252+
// Hold the lock throughout to ensure that we do not race with another ack
253+
auto seqno = seqnoToAck.wlock();
254+
if (*seqno != 0) {
255+
vb.sendSeqnoAck(*seqno);
250256
}
257+
*seqno = 0;
251258
}
252259

253260
std::string PassiveDurabilityMonitor::to_string(Resolution res) {

engines/ep/src/durability/passive_durability_monitor.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
142142
*/
143143
void notifySnapshotEndReceived(uint64_t snapEnd);
144144

145+
/**
146+
* Notify this PDM that some persistence has happened. Attempts to update
147+
* the HPS and ack back to the active.
148+
*/
145149
void notifyLocalPersistence() override;
146150

147151
/**
@@ -159,7 +163,27 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
159163
*/
160164
int64_t getHighestTrackedSeqno() const;
161165

166+
/**
167+
* Test only: Hook which if non-empty is called from
168+
* notifySnapshotEndReceived()
169+
*/
170+
std::function<void()> notifySnapEndSeqnoAckPreProcessHook;
171+
162172
protected:
173+
/**
174+
* Store the seqno ack that we should now send to the consumer. Overwrites
175+
* any outstanding ack not yet sent if the new value is greater.
176+
*
177+
* @param prevHps determines if we should send an ack or not
178+
* @param newHps new hps to ack
179+
*/
180+
void storeSeqnoAck(int64_t prevHps, int64_t newHps);
181+
182+
/**
183+
* Send, if we need to, a seqno ack to the active node.
184+
*/
185+
void sendSeqnoAck();
186+
163187
void toOStream(std::ostream& os) const override;
164188
/**
165189
* throw exception with the following error string:
@@ -181,6 +205,9 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
181205
struct State;
182206
folly::SynchronizedPtr<std::unique_ptr<State>> state;
183207

208+
/// Outstanding seqno ack to send to the active. 0 if no ack outstanding
209+
folly::Synchronized<int64_t> seqnoToAck{0};
210+
184211
// Necessary for implementing ADM(PDM&&)
185212
friend class ActiveDurabilityMonitor;
186213

engines/ep/tests/module_tests/durability_monitor_test.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3233,6 +3233,43 @@ TEST_P(ActiveDurabilityMonitorPersistentTest,
32333233
}
32343234
}
32353235

3236+
TEST_P(PassiveDurabilityMonitorTest, SendSeqnoAckRace) {
3237+
auto& pdm = getPassiveDM();
3238+
ASSERT_EQ(0, pdm.getNumTracked());
3239+
3240+
Monotonic<int64_t> ackedSeqno = 0;
3241+
3242+
// Set up our function hooks
3243+
pdm.notifySnapEndSeqnoAckPreProcessHook = [this, &pdm]() {
3244+
// We are outside the state lock at this point but have not yet called
3245+
// vb->sendSeqnoAck. Simulate this race by notifying persistence which
3246+
// should attempt to ack 2.
3247+
vb->setPersistenceSeqno(2);
3248+
pdm.notifyLocalPersistence();
3249+
};
3250+
3251+
// Test: Assert the monotonicity of the ack that we attempt to send
3252+
VBucketTestIntrospector::setSeqnoAckCb(
3253+
*vb, [&ackedSeqno](Vbid vbid, uint64_t hps) { ackedSeqno = hps; });
3254+
3255+
// Load two SyncWrites. For this test we want a snapshot end to attempt to
3256+
// seqno ack with seqno 1 but be "descheduled" before it can and for a
3257+
// persistence callback to ack with seqno 2 before the seqno ack for seqno 1
3258+
// completes. This could happen with the following two prepares in a single
3259+
// snapshot.
3260+
using namespace cb::durability;
3261+
addSyncWrite(1 /*seqno*/,
3262+
Requirements{Level::Majority, Timeout::Infinity()});
3263+
addSyncWrite(2 /*seqno*/,
3264+
Requirements{Level::PersistToMajority, Timeout::Infinity()});
3265+
ASSERT_EQ(2, pdm.getNumTracked());
3266+
3267+
// Should only be able to ack seqno 1 as we require persistence to ack seqno
3268+
// 2.
3269+
pdm.notifySnapshotEndReceived(2);
3270+
EXPECT_EQ(2, ackedSeqno);
3271+
}
3272+
32363273
INSTANTIATE_TEST_CASE_P(AllBucketTypes,
32373274
ActiveDurabilityMonitorTest,
32383275
STParameterizedBucketTest::allConfigValues(),

engines/ep/tests/module_tests/vbucket_utils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ class VBucketTestIntrospector {
3030
static PassiveDurabilityMonitor& public_getPassiveDM(VBucket& vb) {
3131
return vb.getPassiveDM();
3232
}
33+
34+
static void setSeqnoAckCb(VBucket& vb, SeqnoAckCallback func) {
35+
vb.seqnoAckCb = func;
36+
}
3337
};

0 commit comments

Comments
 (0)