Skip to content

Commit f406696

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-35003: Set fail-over seqno to be the end seqno of previous complete snapshot
This commit changes how the flushVBucket function updates the current snapshot range for pending/replica vbuckets only. Note the flusher is shared between active/pending/replica vbuckets and as such these changes are live for all vbucket states, however active vbuckets always overwrite the snapshot range with the high flushed seqno. This description first covers some examples of how the flusher managed the snapshot range prior to the change. *Note*: in all of these examples the current/final snapshot is partially received. ex1: Before this commit the flusher very much follows what the checkpoint manager tells it, e.g. a partially received snapshot {4,8} resulted in the following disk state: vbstate.range = {4,8} seqno index = 1,2,3,4,5,6 ex2: Before this commit multiple checkpoint snapshots could be flushed as a combined set of items, e.g. receipt of snapshots {1,3} and {4,8} followed by a flush results in the following disk state: vbstate.range = {1,8} seqno index = 1,2,3,4,5,6 ex3: A further important example is from MB-35003 itself, in that when the producer switches from 'backfill' to 'in-memory' the first in-memory snapshot is now tagged with the 'checkpoint' flag, this was introduced in MB-35001. Before MB-35001 a disk followed by memory snapshot looked as follows, here we have {1,3, disk} and {4,8, memory}. When a snapshot is received without the checkpoint flag, the snapshot items just enter the current checkpoint. Once the flusher is past the {1,3} snapshot, the subsequent snapshot is just an extension of the current one, tagging snapshots with the checkpoint flag means a new checkpoint is opened, which can yield a new outcome. With MB-35001, the addition of the checkpoint flag means the following is now a possible outcome of the flusher: vbstate.range = {5,8} seqno index = 1,2,3,4,5,6 An important outcome of this commit is what happens during replica to active promotion, and where we set the seqno of the fail-over table entry. The logic is as follows: if (highSeqno == vbstate.range.end) { newEntry.seqno = highseqno } else { newEntry.seqno = vbstate.range.start } With the examples above, promotion to active yields the following new fail-over entry seqno. ex1: 4 ex2: 1 ex3: 5 In all of these examples, because of the partial snapshot the fail-over entry seqo is always the start of a snapshot, and ex3 it's the start of an artificial checkpoint. This commit changes the outcome of all of these examples, instead of the start of the partial snapshot, the fail-over entry seqno will become the end of the last complete snapshot. To achieve this the flusher now gets more information about the set of items it is flushing. The checkpoint manager is changed so that the flusher receives. * The entire set of items to flush. * A vector of snapshot_range_t for individual checkpoint that makes up the set of items. As the flusher iterates through the set of items to flush, the seqno of each flushed item is compared against the end seqno of each snapshot. If there is a match, the flusher concludes it has all the items of that particular snapshot and it can now change the start seqno of the vbucket's range to be that of the completed snapshot. Each example from above now changes to have the following outcomes. ex1: vbstate.range = {3,8} seqno index = 1,2,3,4,5,6 fail-over = 3 ex2: vbstate.range = {3,8} seqno index = 1,2,3,4,5,6 fail-over = 3 ex3: vbstate.range = {3,8} seqno index = 1,2,3,4,5,6 fail-over = 3 A final notable difference that this commit makes is that once the flusher has absolutely flushed to the very end of the range, the state now looks as follows. vbstate.range = {8,8} seqno index = 1,2,3,4,5,6,7,8 I.e. start = end = high-seqno There seems to be no advantage to adapting the flusher further to maintain the prior behaviour, where in an absolutely flushed scenario we maintain the range.start to reflect some lower value. In all failure and fail-over scenarios, when the high-seqno matches the range.end, the range.start is not used. Change-Id: I54e3851378a9e19ad350fc17741fa19dfa69b2fa Reviewed-on: http://review.couchbase.org/113433 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent 3ad8757 commit f406696

File tree

13 files changed

+489
-108
lines changed

13 files changed

+489
-108
lines changed

docs/dcp/documentation/commands/snapshot-marker.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,47 @@ If data in this packet is malformed or incomplete then this error is returned.
127127
**(Disconnect)**
128128

129129
If this message is sent to a connection that is not a consumer.
130+
131+
### Implementation notes
132+
133+
The implementation of DCP has lead to some inconsistencies in the way that the
134+
snapshot marker assigns the value of "Start Seqno" depending on the context.
135+
136+
Note that [stream-request](stream-request.md) defines "Start Seqno" to be
137+
maximum sequence number that the client has received. A request with a start
138+
seqno number of X, means "I have X, please start my stream at the sequence
139+
number after X".
140+
141+
#### Memory snapshot.start-seqno equals seqno of first transmitted seqno
142+
143+
A stream which is transferring in-memory checkpoint data sets the
144+
`snapshot-marker.start-seqno` to the seqno of the first mutation that will be
145+
follow the marker. This matches with the semantics of stream-request where the
146+
start-seqno is something the client already has.
147+
148+
Thus a client which performs a stream-request with a start-seqno of X, but due
149+
to de-duplication X+n is the first sequence number available (from memory), the
150+
client will receive:
151+
152+
* TX `stream-request{start-seqno=X}`
153+
* RX `stream-request-response{success}`
154+
* RX `snapshot-marker{start=X+n, end=Y, flags=0x1}`
155+
* RX `mutation{seqno:X+n}`
156+
157+
#### Disk snapshot-marker.start-seqno equals stream-request.start-seqno
158+
159+
The difference here is when a stream-request has to backfill from disk, the
160+
`0x02 disk` snapshot marker has the start-seqno set to the clients requested
161+
start-seqno. The returned mutations are correct from the definition of
162+
stream-request but the snapshot-marker could be viewed as inconsistent with the
163+
stream-request definition and the in-memory case.
164+
165+
This is not consistent with the in-memory case, example:
166+
167+
* TX `stream-request{start-seqno=X}`
168+
* RX `stream-request-response{success}`
169+
* RX `snapshot-marker{start=X, end=Y, flags=0x2}`
170+
* RX `mutation{seqno:X+n}`
171+
172+
Note: A stream could at any time switch from memory to disk if the client is
173+
deemed to be slow.

engines/ep/src/checkpoint_manager.cc

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -833,9 +833,7 @@ void CheckpointManager::queueSetVBState(VBucket& vb) {
833833

834834
CheckpointManager::ItemsForCursor CheckpointManager::getNextItemsForCursor(
835835
CheckpointCursor* cursor, std::vector<queued_item>& items) {
836-
auto result = getItemsForCursor(
837-
cursor, items, std::numeric_limits<size_t>::max());
838-
return result;
836+
return getItemsForCursor(cursor, items, std::numeric_limits<size_t>::max());
839837
}
840838

841839
CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
@@ -844,20 +842,20 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
844842
size_t approxLimit) {
845843
LockHolder lh(queueLock);
846844
if (!cursorPtr) {
847-
EP_LOG_WARN("getNextItemsForCursor(): Caller had a null cursor {}",
845+
EP_LOG_WARN("getItemsForCursor(): Caller had a null cursor {}",
848846
vbucketId);
849-
return {0, 0};
847+
return {};
850848
}
851849

852850
auto& cursor = *cursorPtr;
853851

854852
// Fetch whole checkpoints; as long as we don't exceed the approx item
855853
// limit.
856-
ItemsForCursor result((*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
857-
(*cursor.currentCheckpoint)->getSnapshotEndSeqno(),
858-
(*cursor.currentCheckpoint)->getCheckpointType());
854+
ItemsForCursor result((*cursor.currentCheckpoint)->getCheckpointType(),
855+
(*cursor.currentCheckpoint)->getHighCompletedSeqno());
859856

860857
size_t itemCount = 0;
858+
bool enteredNewCp = true;
861859
while ((result.moreAvailable = incrCursor(cursor))) {
862860
// We only want to return items from contiguous checkpoints with the
863861
// same type. We should not return Memory checkpoint items followed by
@@ -868,12 +866,20 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
868866
result.checkpointType) {
869867
break;
870868
}
869+
if (enteredNewCp) {
870+
result.ranges.push_back(
871+
{(*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
872+
(*cursor.currentCheckpoint)->getSnapshotEndSeqno()});
873+
enteredNewCp = false;
874+
}
871875

872876
queued_item& qi = *(cursor.currentPos);
873877
items.push_back(qi);
874878
itemCount++;
875879

876880
if (qi->getOperation() == queue_op::checkpoint_end) {
881+
enteredNewCp = true; // the next incrCuror will move to a new CP
882+
877883
// Only move the HCS at checkpoint end (don't want to flush a
878884
// HCS mid-checkpoint).
879885
result.highCompletedSeqno =
@@ -883,8 +889,6 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
883889
// our limit.
884890
if (itemCount >= approxLimit) {
885891
// Reached our limit - don't want any more items.
886-
result.range.setEnd(
887-
(*cursor.currentCheckpoint)->getSnapshotEndSeqno());
888892

889893
// However, we *do* want to move the cursor into the next
890894
// checkpoint if possible; as that means the checkpoint we just
@@ -894,19 +898,23 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
894898
break;
895899
}
896900
}
897-
// May have moved into a new checkpoint - update range.end.
898-
result.range.setEnd((*cursor.currentCheckpoint)->getSnapshotEndSeqno());
899901
}
900902

901-
EP_LOG_DEBUG(
902-
"CheckpointManager::getNextItemsForCursor() "
903-
"cursor:{} result:{{#items:{} range:{{{}, {}}} "
904-
"moreAvailable:{}}}",
905-
cursor.name,
906-
uint64_t(itemCount),
907-
result.range.getStart(),
908-
result.range.getEnd(),
909-
result.moreAvailable ? "true" : "false");
903+
if (globalBucketLogger->should_log(spdlog::level::debug)) {
904+
std::stringstream ranges;
905+
for (const auto& range : result.ranges) {
906+
ranges << "{" << range.getStart() << "," << range.getEnd() << "}";
907+
}
908+
EP_LOG_DEBUG(
909+
"CheckpointManager::getItemsForCursor() "
910+
"cursor:{} result:{{#items:{} ranges:size:{} {} "
911+
"moreAvailable:{}}}",
912+
cursor.name,
913+
uint64_t(itemCount),
914+
result.ranges.size(),
915+
ranges.str(),
916+
result.moreAvailable ? "true" : "false");
917+
}
910918

911919
cursor.numVisits++;
912920

engines/ep/src/checkpoint_manager.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,16 @@ class CheckpointManager {
5555
public:
5656
typedef std::shared_ptr<Callback<Vbid>> FlusherCallback;
5757

58-
/// Return type of getItemsForCursor()
58+
/// Return type of getNextItemsForCursor()
5959
struct ItemsForCursor {
60-
ItemsForCursor(uint64_t start,
61-
uint64_t end,
62-
CheckpointType checkpointType = CheckpointType::Memory,
63-
boost::optional<uint64_t> highCompletedSeqno = {})
64-
: range(start, end),
65-
checkpointType(checkpointType),
60+
ItemsForCursor() {
61+
}
62+
ItemsForCursor(CheckpointType checkpointType,
63+
boost::optional<uint64_t> highCompletedSeqno)
64+
: checkpointType(checkpointType),
6665
highCompletedSeqno(highCompletedSeqno) {
6766
}
68-
snapshot_range_t range;
67+
std::vector<snapshot_range_t> ranges;
6968
bool moreAvailable = {false};
7069
CheckpointType checkpointType = CheckpointType::Memory;
7170

engines/ep/src/ep_bucket.cc

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
364364
// a single flush.
365365
auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger);
366366
auto& items = toFlush.items;
367-
auto& range = toFlush.range;
367+
// The range becomes initialised only when an item is flushed
368+
boost::optional<snapshot_range_t> range;
368369
moreAvailable = toFlush.moreAvailable;
369370

370371
KVStore* rwUnderlying = getRWUnderlying(vb->getId());
@@ -403,8 +404,6 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
403404
uint64_t maxSeqno = 0;
404405
auto minSeqno = std::numeric_limits<uint64_t>::max();
405406

406-
range.setStart(std::max(range.getStart(), vbstate.lastSnapStart));
407-
408407
bool mustCheckpointVBState = false;
409408

410409
Collections::VB::Flush collectionFlush(vb->getManifest());
@@ -492,6 +491,35 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
492491
}
493492
++stats.flusher_todo;
494493

494+
if (!range.is_initialized()) {
495+
range = snapshot_range_t{
496+
vbstate.lastSnapStart,
497+
toFlush.ranges.empty()
498+
? vbstate.lastSnapEnd
499+
: toFlush.ranges.back().getEnd()};
500+
}
501+
502+
// Is the item the end item of one of the ranges we're
503+
// flushing? Note all the work here only affects replica VBs
504+
auto itr = std::find_if(
505+
toFlush.ranges.begin(),
506+
toFlush.ranges.end(),
507+
[&item](auto& range) {
508+
return uint64_t(item->getBySeqno()) ==
509+
range.getEnd();
510+
});
511+
512+
// If this is the end item, we can adjust the start of our
513+
// flushed range, which would be used for failure purposes.
514+
// Primarily by bringing the start to be a consistent point
515+
// allows for promotion to active to set the fail-over table
516+
// to a consistent point.
517+
if (itr != toFlush.ranges.end()) {
518+
// Use std::max as the flusher is not visiting in seqno
519+
// order.
520+
range->setStart(
521+
std::max(range->getStart(), itr->getEnd()));
522+
}
495523
} else {
496524
// Item is the same key as the previous[1] one - don't need
497525
// to flush to disk.
@@ -521,9 +549,9 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
521549

522550
// only update the snapshot range if items were flushed, i.e.
523551
// don't appear to be in a snapshot when you have no data for it
524-
if (items_flushed) {
525-
vbstate.lastSnapStart = range.getStart();
526-
vbstate.lastSnapEnd = range.getEnd();
552+
if (range) {
553+
vbstate.lastSnapStart = range->getStart();
554+
vbstate.lastSnapEnd = range->getEnd();
527555
}
528556
// Track the lowest seqno written in spock and record it as
529557
// the HLC epoch, a seqno which we can be sure the value has a
@@ -582,8 +610,8 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
582610
if (vb->rejectQueue.empty()) {
583611
// only update the snapshot range if items were flushed, i.e.
584612
// don't appear to be in a snapshot when you have no data for it
585-
if (items_flushed) {
586-
vb->setPersistedSnapshot(range.getStart(), range.getEnd());
613+
if (range) {
614+
vb->setPersistedSnapshot(*range);
587615
}
588616
uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
589617
if (highSeqno > 0 && highSeqno != vb->getPersistenceSeqno()) {

engines/ep/src/ep_types.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,11 @@ std::ostream& operator<<(std::ostream& os, TransferVB transfer) {
117117
throw std::invalid_argument("operator<<(TransferVB) unknown value " +
118118
std::to_string(static_cast<int>(transfer)));
119119
}
120+
121+
std::ostream& operator<<(std::ostream& os, const snapshot_range_t& range) {
122+
return os << "{" << range.getStart() << "," << range.getEnd() << "}";
123+
}
124+
125+
std::ostream& operator<<(std::ostream& os, const snapshot_info_t& info) {
126+
return os << "start:" << info.start << ", range:" << info.range;
127+
}

engines/ep/src/ep_types.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ struct snapshot_range_t {
113113
uint64_t end;
114114
};
115115

116+
std::ostream& operator<<(std::ostream&, const snapshot_range_t&);
117+
116118
struct snapshot_info_t {
117119
snapshot_info_t(uint64_t start, snapshot_range_t range)
118120
: start(start), range(range) {
@@ -121,6 +123,8 @@ struct snapshot_info_t {
121123
snapshot_range_t range;
122124
};
123125

126+
std::ostream& operator<<(std::ostream&, const snapshot_info_t&);
127+
124128
/**
125129
* The following options can be specified
126130
* for retrieving an item for get calls

engines/ep/src/kv_bucket.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2008,7 +2008,7 @@ void KVBucket::reset() {
20082008
vb->ht.clear();
20092009
vb->checkpointManager->clear(vb->getState());
20102010
vb->resetStats();
2011-
vb->setPersistedSnapshot(0, 0);
2011+
vb->setPersistedSnapshot({0, 0});
20122012
EP_LOG_INFO("KVBucket::reset(): Successfully flushed {}", vbid);
20132013
}
20142014
}

engines/ep/src/vbucket.cc

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ VBucket::VBucket(Vbid i,
209209
initialState(initState),
210210
purge_seqno(purgeSeqno),
211211
takeover_backed_up(false),
212-
persisted_snapshot_start(lastSnapStart),
213-
persisted_snapshot_end(lastSnapEnd),
212+
persistedRange(lastSnapStart, lastSnapEnd),
214213
receivingInitialDiskSnapshot(false),
215214
rollbackItemCount(0),
216215
hlc(maxCas,
@@ -240,17 +239,14 @@ VBucket::VBucket(Vbid i,
240239
setupSyncReplication(replTopology);
241240

242241
EP_LOG_INFO(
243-
"VBucket: created {} with state:{} "
244-
"initialState:{} lastSeqno:{} lastSnapshot:{{{},{}}} "
245-
"persisted_snapshot:{{{},{}}} max_cas:{} uuid:{} topology:{}",
242+
"VBucket: created {} with state:{} initialState:{} lastSeqno:{} "
243+
"persistedRange:{{{},{}}} max_cas:{} uuid:{} topology:{}",
246244
id,
247245
VBucket::toString(state),
248246
VBucket::toString(initialState),
249247
lastSeqno,
250-
lastSnapStart,
251-
lastSnapEnd,
252-
persisted_snapshot_start,
253-
persisted_snapshot_end,
248+
persistedRange.getStart(),
249+
persistedRange.getEnd(),
254250
getMaxCas(),
255251
failovers ? std::to_string(failovers->getLatestUUID()) : "<>",
256252
replicationTopology.rlock()->dump());
@@ -406,19 +402,13 @@ VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
406402
auto _begin_ = std::chrono::steady_clock::now();
407403
auto ckptItems = checkpointManager->getItemsForPersistence(
408404
result.items, ckptMgrLimit);
409-
result.range = ckptItems.range;
405+
result.ranges = std::move(ckptItems.ranges);
410406
result.highCompletedSeqno = ckptItems.highCompletedSeqno;
411407
ckptItemsAvailable = ckptItems.moreAvailable;
412408
stats.persistenceCursorGetItemsHisto.add(
413409
std::chrono::duration_cast<std::chrono::microseconds>(
414410
std::chrono::steady_clock::now() - _begin_));
415-
} else {
416-
// We haven't got sufficient remaining capacity to read items from
417-
// CheckpoitnManager, therefore we must assume that there /could/
418-
// more data to follow (leaving ckptItemsAvailable true). We also must
419-
// ensure the valid snapshot range is returned
420-
result.range = checkpointManager->getSnapshotInfo().range;
421-
}
411+
} // else result.ranges is empty, all items from rejectQueue
422412

423413
// Check if there's any more items remaining.
424414
result.moreAvailable = !rejectQueue.empty() || ckptItemsAvailable;
@@ -2840,8 +2830,8 @@ void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
28402830
uint64_t prevHighSeqno) {
28412831
failovers->pruneEntries(rollbackResult.highSeqno);
28422832
checkpointManager->clear(*this, rollbackResult.highSeqno);
2843-
setPersistedSnapshot(rollbackResult.snapStartSeqno,
2844-
rollbackResult.snapEndSeqno);
2833+
setPersistedSnapshot(
2834+
{rollbackResult.snapStartSeqno, rollbackResult.snapEndSeqno});
28452835
incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
28462836
checkpointManager->setOpenCheckpointId(1);
28472837
setReceivingInitialDiskSnapshot(false);

engines/ep/src/vbucket.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,15 +207,14 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
207207
purge_seqno = to;
208208
}
209209

210-
void setPersistedSnapshot(uint64_t start, uint64_t end) {
210+
void setPersistedSnapshot(const snapshot_range_t& range) {
211211
LockHolder lh(snapshotMutex);
212-
persisted_snapshot_start = start;
213-
persisted_snapshot_end = end;
212+
persistedRange = range;
214213
}
215214

216215
snapshot_range_t getPersistedSnapshot() const {
217216
LockHolder lh(snapshotMutex);
218-
return {persisted_snapshot_start, persisted_snapshot_end};
217+
return persistedRange;
219218
}
220219

221220
uint64_t getMaxCas() const {
@@ -469,7 +468,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
469468

470469
struct ItemsToFlush {
471470
std::vector<queued_item> items;
472-
snapshot_range_t range{0, 0};
471+
std::vector<snapshot_range_t> ranges;
473472
bool moreAvailable = false;
474473
boost::optional<uint64_t> highCompletedSeqno = {};
475474
};
@@ -2281,8 +2280,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
22812280
/* snapshotMutex is used to update/read the pair {start, end} atomically,
22822281
but not if reading a single field. */
22832282
mutable std::mutex snapshotMutex;
2284-
uint64_t persisted_snapshot_start;
2285-
uint64_t persisted_snapshot_end;
2283+
snapshot_range_t persistedRange;
22862284

22872285
/*
22882286
* When a vbucket is in the middle of receiving the initial disk snapshot

0 commit comments

Comments
 (0)