Skip to content

Commit 4834fe3

Browse files
jameseh96daverigby
authored andcommitted
MB-34873: disk snapshots - Ack snapEnd seqno once persisted
Problem 1: Any prepare received by a replica from a disk snapshot may have deduped an earlier prepare of a higher level. For example, the following ops (for the same key) PRE(l=PersistToMajority) CMT PRE(l=Majority) CMT May be deduped to PRE(l=Majority) CMT If we acked this prepare immediately, were we to (say) be promoted to active and immediately crash, we may be left with no value on disk for the key BUT the PersistToMajority op may have returned SUCCESS to the client (it was committed) - we have broken the durability agreement for that op. Problem 2: PRE(persistMajority), CMT, PRE(), ABORT, SET may, after the abort has been purged be sent as: SET and we have no way of knowing a durability op was ever present. Solution: Advance the HPS to snapshotEndSeqno and seqnoAck once a full snapshot is persisted, _just in case_ any prepares were deduped. We could ack the latest prepare we are tracking, but we may be unaware of some prepares due to Problem 2, so ack snapshotEndSeqno - any prepares before this seqno are definitely prepared anyway. Change-Id: I9822ea608da79e4ac55f8f4f42cabe545e26adb6 Reviewed-on: http://review.couchbase.org/111643 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 3ac1da9 commit 4834fe3

File tree

9 files changed

+326
-58
lines changed

9 files changed

+326
-58
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -948,17 +948,23 @@ void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
948948
notifyStreamReady();
949949
cur_snapshot_ack = false;
950950
}
951-
cur_snapshot_type.store(Snapshot::None);
952951

953952
// Notify the PassiveDM that the snapshot-end mutation has been
954953
// received on PassiveStream, if the snapshot contains at least one
955954
// Prepare. That is necessary for unblocking the High Prepared Seqno
956955
// in PassiveDM. Note that the HPS is what the PassiveDM acks back to
957956
// the Active. See comments in PassiveDM for details.
958-
if (cur_snapshot_prepare) {
957+
958+
// Disk snapshots are subject to deduplication, and may be missing
959+
// purged aborts. We must notify the PDM even if we have not seen a
960+
// prepare, to account for possible unseen prepares.
961+
if (cur_snapshot_prepare ||
962+
cur_snapshot_type.load() == Snapshot::Disk) {
959963
vb->notifyPassiveDMOfSnapEndReceived(byseqno);
960964
cur_snapshot_prepare.store(false);
961965
}
966+
967+
cur_snapshot_type.store(Snapshot::None);
962968
}
963969
}
964970

engines/ep/src/durability/durability_monitor_impl.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,11 @@ bool DurabilityMonitor::ReplicationChain::hasAcked(const std::string& node,
323323

324324
return itr->second.lastWriteSeqno >= bySeqno;
325325
}
326+
327+
bool operator>(const SnapshotEndInfo& a, const SnapshotEndInfo& b) {
328+
return a.seqno > b.seqno;
329+
}
330+
std::string to_string(const SnapshotEndInfo& snapshotEndInfo) {
331+
return std::to_string(snapshotEndInfo.seqno) + "{" +
332+
to_string(snapshotEndInfo.type) + "}";
333+
}

engines/ep/src/durability/durability_monitor_impl.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,23 @@ struct DurabilityMonitor::ReplicationChain {
261261
const DurabilityMonitor::ReplicationChainName name;
262262
};
263263

264+
// Used to track information necessary for the PassiveDM to correctly ACK
265+
// at the end of snapshots
266+
struct SnapshotEndInfo {
267+
int64_t seqno;
268+
CheckpointType type;
269+
};
270+
271+
/**
272+
* Compares two SnapshotEndInfos by seqno.
273+
*
274+
* Only provides a partial ordering; SnapshotEndInfos with the same seqno
275+
* but different checkpoint type would be arbitrarily ordered if using this
276+
* comparator
277+
*/
278+
bool operator>(const SnapshotEndInfo& a, const SnapshotEndInfo& b);
279+
std::string to_string(const SnapshotEndInfo& snapshotEndInfo);
280+
264281
/*
265282
* This class embeds the state of a PDM. It has been designed for being
266283
* wrapped by a folly::Synchronized<T>, which manages the read/write
@@ -340,11 +357,11 @@ struct PassiveDurabilityMonitor::State {
340357
// PersistToMajority
341358
Position highPreparedSeqno;
342359

343-
// Queue of snapEndSeqnos for snapshots which have been received entirely
344-
// by the (owning) replica/pending VBucket.
360+
// Queue of SnapshotEndInfos for snapshots which have been received entirely
361+
// by the (owning) replica/pending VBucket, and the type of the checkpoint.
345362
// Used for implementing the correct move-logic of High Prepared Seqno.
346363
// Must be pushed to at snapshot-end received on PassiveStream.
347-
MonotonicQueue<uint64_t> receivedSnapshotEndSeqnos;
364+
MonotonicQueue<SnapshotEndInfo> receivedSnapshotEnds;
348365

349366
// Cumulative count of accepted (tracked) SyncWrites.
350367
size_t totalAccepted = 0;

engines/ep/src/durability/passive_durability_monitor.cc

Lines changed: 90 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ void PassiveDurabilityMonitor::notifySnapshotEndReceived(uint64_t snapEnd) {
131131
int64_t hps{0};
132132
{
133133
auto s = state.wlock();
134-
s->receivedSnapshotEndSeqnos.push(snapEnd);
134+
s->receivedSnapshotEnds.push({int64_t(snapEnd),
135+
vb.isReceivingDiskSnapshot()
136+
? CheckpointType::Disk
137+
: CheckpointType::Memory});
135138
// Maybe the new tracked Prepare is already satisfied and could be
136139
// ack'ed back to the Active.
137140
prevHps = s->highPreparedSeqno.lastWriteSeqno;
@@ -307,6 +310,7 @@ void PassiveDurabilityMonitor::State::updateHighPreparedSeqno() {
307310
// at PDM under the following constraints:
308311
//
309312
// (1) Nothing is ack'ed before the complete snapshot is received
313+
// (I.e., do nothing if receivedSnapshotEnds is empty)
310314
//
311315
// (2) Majority and MajorityAndPersistOnMaster Prepares (which don't need to
312316
// be persisted for being locally satisfied) may be satisfied as soon as
@@ -319,6 +323,12 @@ void PassiveDurabilityMonitor::State::updateHighPreparedSeqno() {
319323
// (4) The durability-fence can move (ie, PersistToMajority Prepares are
320324
// locally-satisfied) only when the complete snapshot is persisted.
321325
//
326+
// (5) Once a disk snapshot is fully persisted, the HPS is advanced to the
327+
// snapshot end - even if no prepares were seen during the snapshot
328+
// or if trackedWrites is empty. This accounts for deduping; there may
329+
// have been prepares we have not seen, but they are definitely
330+
// satisfied (they are persisted) and should be acked.
331+
//
322332
// This function implements all the logic necessary for moving the HPS by
323333
// enforcing the rules above. The function is called:
324334
//
@@ -334,16 +344,6 @@ void PassiveDurabilityMonitor::State::updateHighPreparedSeqno() {
334344
// durability-fence. As already mentioned, we can move the
335345
// durability-fence only if the complete snapshot is persisted.
336346

337-
if (trackedWrites.empty()) {
338-
return;
339-
}
340-
341-
if (receivedSnapshotEndSeqnos.empty()) {
342-
// We have not received a full snapshot, we cannot advance the hps at
343-
// all
344-
return;
345-
}
346-
347347
const auto prevHPS = highPreparedSeqno.lastWriteSeqno;
348348

349349
// Helper to keep conditions short and meaningful
@@ -355,49 +355,87 @@ void PassiveDurabilityMonitor::State::updateHighPreparedSeqno() {
355355
snapshotEndSeqno;
356356
};
357357

358-
while (!receivedSnapshotEndSeqnos.empty() && !trackedWrites.empty()) {
359-
uint64_t snapshotEndSeqno = receivedSnapshotEndSeqnos.front();
360-
361-
// ** If pdm.vb.getPersistenceSeqno() >= snapshotEndSeqno
362-
// we have received and persisted an entire snapshot
363-
// All prepares from this snapshot are satisfied and the state
364-
// is consistent at snap end. The HPS can advance over Prepares of
365-
// PersistToMajority or lower (i.e., everything currently)
366-
367-
// ** if pdm.vb.getPersistenceSeqno() < snapshotEndSeqno
368-
// we have received but NOT persisted an entire snapshot
369-
// We *may* be able to advance the HPS part way
370-
// into this snapshot - The HPS can be advanced over all Prepares of
371-
// MajorityAndPersistOnMaster level or lower, to the last Prepare
372-
// immediately preceding an *unpersisted* Prepare with Level ==
373-
// PersistToMajority. We cannot move the HPS past this Prepare until
374-
// it *is* persisted.
375-
376-
const cb::durability::Level maxLevelCanAdvanceOver =
377-
(pdm.vb.getPersistenceSeqno() >= snapshotEndSeqno)
378-
? cb::durability::Level::PersistToMajority
379-
: cb::durability::Level::MajorityAndPersistOnMaster;
380-
381-
for (auto next = getIteratorNext(highPreparedSeqno.it);
382-
inSnapshot(snapshotEndSeqno, next) &&
383-
next->getDurabilityReqs().getLevel() <= maxLevelCanAdvanceOver;
384-
next = getIteratorNext(highPreparedSeqno.it)) {
385-
// Note: Update last-write-seqno first to enforce monotonicity and
386-
// avoid any state-change if monotonicity checks fail
387-
highPreparedSeqno.lastWriteSeqno = next->getBySeqno();
388-
highPreparedSeqno.it = next;
358+
while (!receivedSnapshotEnds.empty()) {
359+
const auto snapshotEnd = receivedSnapshotEnds.front();
360+
361+
const bool snapshotFullyPersisted =
362+
static_cast<int64_t>(pdm.vb.getPersistenceSeqno()) >=
363+
snapshotEnd.seqno;
364+
365+
const bool isDiskSnapshot = snapshotEnd.type == CheckpointType::Disk;
366+
367+
using namespace cb::durability;
368+
369+
Level maxLevelCanAdvanceOver{};
370+
371+
if (snapshotFullyPersisted) {
372+
// we have received and persisted an entire snapshot
373+
// All prepares from this snapshot are satisfied and the state
374+
// is consistent at snap end. The HPS can advance over Prepares of
375+
// PersistToMajority or lower (i.e., everything currently)
376+
maxLevelCanAdvanceOver = Level::PersistToMajority;
377+
} else if (!isDiskSnapshot) {
378+
// we have received but NOT persisted an entire snapshot
379+
// We *may* be able to advance the HPS part way
380+
// into this snapshot - The HPS can be advanced over all Prepares of
381+
// MajorityAndPersistOnMaster level or lower, to the last Prepare
382+
// immediately preceding an *unpersisted* Prepare with Level ==
383+
// PersistToMajority. We cannot move the HPS past this Prepare until
384+
// it *is* persisted.
385+
maxLevelCanAdvanceOver = Level::MajorityAndPersistOnMaster;
386+
} else {
387+
// we have received but NOT persisted an entire *DISK* snapshot
388+
// we cannot ack anything until the entire snapshot has been
389+
// persisted because PersistToMajority level Prepares may have been
390+
// deduped by lower level prepares.
391+
// Therefore, the HPS cannot advance over *any* prepares.
392+
maxLevelCanAdvanceOver = Level::None;
389393
}
390-
// Check if we finished an entire snapshot, and might be able to
391-
// continue checking the next one.
392-
if (inSnapshot(snapshotEndSeqno,
393-
getIteratorNext(highPreparedSeqno.it))) {
394-
// we stopped advancing the HPS before the end of a snapshot
395-
// because we reached a PersistToMajority Prepare
396-
// HPS now points to the last Prepare before any
397-
// PersistToMajority
398-
break;
394+
395+
// Advance the HPS, respecting maxLevelCanAdvanceOver
396+
if (!trackedWrites.empty()) {
397+
for (auto next = getIteratorNext(highPreparedSeqno.it);
398+
inSnapshot(snapshotEnd.seqno, next) &&
399+
next->getDurabilityReqs().getLevel() <= maxLevelCanAdvanceOver;
400+
next = getIteratorNext(highPreparedSeqno.it)) {
401+
// Note: Update last-write-seqno first to enforce monotonicity
402+
// and avoid any state-change if monotonicity checks fail
403+
highPreparedSeqno.lastWriteSeqno = next->getBySeqno();
404+
highPreparedSeqno.it = next;
399405
}
400-
receivedSnapshotEndSeqnos.pop();
406+
}
407+
408+
if (isDiskSnapshot && snapshotFullyPersisted) {
409+
// Special case - prepares in disk snapshots may have been
410+
// deduplicated.
411+
// PRE(persistMajority), CMT, PRE(), ABORT, SET
412+
// may, after the abort has been purged be sent as:
413+
// SET
414+
// We would have no prepare for this op, but we still need to
415+
// seqno ack something. To resolve this, advance the HPS seqno to
416+
// the snapshotEndSeqno. There may not be an associated prepare.
417+
// NB: lastWriteSeqno is NOT guaranteed to match
418+
// highPreparedSeqno.it->getBySeqno()
419+
// because of this case
420+
highPreparedSeqno.lastWriteSeqno = snapshotEnd.seqno;
421+
}
422+
423+
// Check if we could have acked everything within the snapshot and
424+
// might be able to continue checking the next one.
425+
if ((isDiskSnapshot && !snapshotFullyPersisted) ||
426+
inSnapshot(snapshotEnd.seqno,
427+
getIteratorNext(highPreparedSeqno.it))) {
428+
// Either we have not fully persisted a disk snapshot and
429+
// the HPS is left <= the start of this snapshot
430+
// OR
431+
// we stopped advancing the HPS before the end of a memory
432+
// snapshot because we reached a PersistToMajority Prepare
433+
// HPS now points to the last Prepare before any
434+
// PersistToMajority
435+
break;
436+
}
437+
438+
receivedSnapshotEnds.pop();
401439
}
402440

403441
// We have now acked all the complete, persisted snapshots we received,

engines/ep/src/ep_types.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ std::string to_string(TrackCasDrift trackCasDrift) {
6767
std::to_string(static_cast<TrackCasDriftUType>(trackCasDrift)));
6868
}
6969

70+
std::string to_string(CheckpointType checkpointType) {
71+
switch (checkpointType) {
72+
case CheckpointType::Disk:
73+
return "Disk";
74+
case CheckpointType::Memory:
75+
return "Memory";
76+
}
77+
folly::assume_unreachable();
78+
}
79+
7080
std::string to_string(HighPriorityVBNotify hpNotifyType) {
7181
using HighPriorityVBNotifyUType =
7282
std::underlying_type<HighPriorityVBNotify>::type;

engines/ep/src/ep_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ std::ostream& operator<<(std::ostream&, TransferVB transfer);
6363
std::string to_string(GenerateBySeqno generateBySeqno);
6464
std::string to_string(GenerateCas generateCas);
6565
std::string to_string(TrackCasDrift trackCasDrift);
66+
std::string to_string(CheckpointType checkpointType);
6667

6768
struct snapshot_range_t {
6869
snapshot_range_t(uint64_t start, uint64_t end) : start(start), end(end) {

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3829,6 +3829,8 @@ static enum test_result test_dcp_consumer_takeover(EngineIface* h) {
38293829
"Failed to send dcp mutation");
38303830
}
38313831

3832+
wait_for_flusher_to_settle(h);
3833+
38323834
dcp->snapshot_marker(cookie, stream_opaque, Vbid(0), 6, 10, 10);
38333835
for (int i = 6; i <= 10; i++) {
38343836
const std::string key{"key" + std::to_string(i)};
@@ -3852,18 +3854,32 @@ static enum test_result test_dcp_consumer_takeover(EngineIface* h) {
38523854
"Failed to send dcp mutation");
38533855
}
38543856

3857+
wait_for_flusher_to_settle(h);
3858+
38553859
wait_for_stat_to_be(h, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
38563860

38573861
dcp_step(h, cookie, producers);
38583862
cb_assert(producers.last_op == cb::mcbp::ClientOpcode::DcpSnapshotMarker);
38593863
cb_assert(producers.last_status == cb::mcbp::Status::Success);
38603864
cb_assert(producers.last_opaque != opaque);
38613865

3866+
dcp_step(h, cookie, producers);
3867+
cb_assert(producers.last_op ==
3868+
cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
3869+
cb_assert(producers.last_status == cb::mcbp::Status::Success);
3870+
cb_assert(producers.last_opaque != opaque);
3871+
38623872
dcp_step(h, cookie, producers);
38633873
cb_assert(producers.last_op == cb::mcbp::ClientOpcode::DcpSnapshotMarker);
38643874
cb_assert(producers.last_status == cb::mcbp::Status::Success);
38653875
cb_assert(producers.last_opaque != opaque);
38663876

3877+
dcp_step(h, cookie, producers);
3878+
cb_assert(producers.last_op ==
3879+
cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
3880+
cb_assert(producers.last_status == cb::mcbp::Status::Success);
3881+
cb_assert(producers.last_opaque != opaque);
3882+
38673883
testHarness->destroy_cookie(cookie);
38683884

38693885
return SUCCESS;

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,11 @@ void DurabilityPassiveStreamTest::
868868
// 4) Verify doc state
869869
auto vb = store->getVBucket(vbid);
870870
ASSERT_TRUE(vb);
871+
872+
// tell the DM that this snapshot was persisted
873+
vb->setPersistenceSeqno(streamStartSeqno);
874+
vb->notifyPersistenceToDurabilityMonitor();
875+
871876
{
872877
// findForCommit will return both pending and committed perspectives
873878
auto res = vb->ht.findForCommit(key);

0 commit comments

Comments
 (0)