Skip to content

Commit 108151d

Browse files
committed
MB-54729: DCP disk backfill a CDC stream
Backfill of a CDC stream has 4 possible outcomes. 1) No History. If no history is retained by the KVStore, the backfill behaves no differently than it would on 7.1. ByID or BySeqno backfilling will produce a single snapshot (normal or OSO) for start to end of the disk range. 2) History Available. If the KVStore indicates that history is available the following 3 outcomes are possible. 2.1) start >= ScanContext::historyStartSeqno => single snapshot marker Note this case cannot be encountered with OSO. OSO requires a start of 0. This case occurs because the start seqno is within the retained history. A single snapshot is produced that includes all seqnos from start to the end of the seqno index. This will be "scanAllVersions" scan and the snapshot gets tagged with the "History" flag and tagged that it "may contain duplicates". 2.2) start < ScanContext::historyStartSeqno => two snapshots Note that this case has two outcomes. If an OSO snapshot is possible. An OSO snapshot is followed by a regular snapshot (tagged as History). If OSO is not possible, two regular snapshots occur (differentiated by flags). When the backfill starts below the retained history, DCP will backfill all of the available sequence data, non-history followed by history. This requires two DCP snapshots produced from the same disk snapshot. The first DCP snapshot is the non-history range, a KVStore::scan from the requested start up to, but not including ScanContext::historyStartSeqno. Or this will be an OSO snapshot of the requested collection - the entire collection ByID range. The second DCP snapshot is the history range, a KVStore::scanAllVersions from ScanContext::historyStartSeqno to the end of the disk snapshots sequence index. This snapshot is tagged to indicate that it's History and that it "may contain duplicates". Note that in the case when two DCP snapshots are generated (no OSO), both markers represent full disk range, including the MVS/HCS. The intention of this is to ensure that a failure before reaching the end of the disk snapshot, the client knows it's somewhere in the middle of the full disk range and can correctly resume DCP with the correct start and snapshot range. For example: The disk snapshot has a range of sequence numbers from 1 to 20. This is split into two sub-ranges. Non-history (nh) and history (h). nh{1,10} h{11, 20} If a full backfill occurs of this disk snapshot (DCP stream start is 0) two markers will prefix the transmission of the ranges. snapshot marker 1: snapshot-range{0, 20} mvs = 20, hcs = 20 flags = disk | checkpoint mutations 0 to 10 snapshot marker 2: snapshot range{0, 20} mvs = 20, hcs = 20 flags = disk | checkpoint | history | may_contain_duplicates mutations 11 to 20 If the client disconnected after mutation 10, they would resume with start=10, range={0,20}, indicating they have a partial snapshot. Implementation Notes: The implementation of the "history" range adds a new optional phase to the DCP backfill state machine. When the backfill transitions into backfill_state_scanning the variations of the backfill are checked for. From here the following paths exist. 1) No History. backfill_state_scanning -> backfill_state_complete In this case the full snapshot is delivered from backfill_state_scanning phase. 2.1) Only history. backfill_state_scanning -> backfill_state_scanning_history_snapshot In this case the full snapshot is delivered from backfill_state_scanning_history_snapshot phase. The The backfill_state_scanning phase has only inspected the ScanContext and skipped to history. 2.2) Two snapshots (OSO and regular). backfill_state_scanning -> backfill_state_scanning_history_snapshot In this case both backfill phases are delivering "snapshots" from the same magma disk snapshot. Change-Id: I5a6df7ed929d99187a74a071c1d523d904cd6f7e Reviewed-on: https://review.couchbase.org/c/kv_engine/+/185002 Well-Formed: Restriction Checker Tested-by: Jim Walker <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]>
1 parent d045219 commit 108151d

16 files changed

+772
-72
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
279279
uint64_t endSeqno,
280280
std::optional<uint64_t> highCompletedSeqno,
281281
uint64_t maxVisibleSeqno,
282-
std::optional<uint64_t> timestamp) {
282+
std::optional<uint64_t> timestamp,
283+
SnapshotSource source) {
283284
{
284285
std::unique_lock<std::mutex> lh(streamMutex);
285286

@@ -322,7 +323,8 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
322323
/* We may need to send the requested 'snap_start_seqno_' as the snapshot
323324
start when we are sending the first snapshot because the first
324325
snapshot could be resumption of a previous snapshot */
325-
startSeqno = adjustStartIfFirstSnapshot(startSeqno);
326+
startSeqno = adjustStartIfFirstSnapshot(
327+
startSeqno, source != SnapshotSource::NoHistoryPrologue);
326328

327329
VBucketPtr vb = engine->getVBucket(vb_);
328330
if (!vb) {
@@ -361,24 +363,32 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
361363
auto mvsToSend = supportSyncReplication()
362364
? std::make_optional(maxVisibleSeqno)
363365
: std::nullopt;
366+
367+
auto flags = MARKER_FLAG_DISK | MARKER_FLAG_CHK;
368+
369+
if (source == SnapshotSource::History) {
370+
flags |= (MARKER_FLAG_HISTORY |
371+
MARKER_FLAG_MAY_CONTAIN_DUPLICATE_KEYS);
372+
}
373+
364374
log(spdlog::level::level_enum::info,
365375
"{} ActiveStream::markDiskSnapshot: Sending disk snapshot with "
366-
"start {}, end {}, and high completed {}, max visible {}",
376+
"start:{}, end:{}, flags:0x{:x}, hcs:{}, mvs:{}",
367377
logPrefix,
368378
startSeqno,
369379
endSeqno,
380+
flags,
370381
to_string_or_none(hcsToSend),
371382
to_string_or_none(mvsToSend));
372-
pushToReadyQ(std::make_unique<SnapshotMarker>(
373-
opaque_,
374-
vb_,
375-
startSeqno,
376-
endSeqno,
377-
MARKER_FLAG_DISK | MARKER_FLAG_CHK,
378-
hcsToSend,
379-
mvsToSend,
380-
timestamp,
381-
sid));
383+
pushToReadyQ(std::make_unique<SnapshotMarker>(opaque_,
384+
vb_,
385+
startSeqno,
386+
endSeqno,
387+
flags,
388+
hcsToSend,
389+
mvsToSend,
390+
timestamp,
391+
sid));
382392
lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
383393

384394
if (!isDiskOnly()) {
@@ -2343,7 +2353,7 @@ bool ActiveStream::isSeqnoGapAtEndOfSnapshot(uint64_t streamSeqno) const {
23432353
void ActiveStream::sendSnapshotAndSeqnoAdvanced(CheckpointType checkpointType,
23442354
uint64_t start,
23452355
uint64_t end) {
2346-
start = adjustStartIfFirstSnapshot(start);
2356+
start = adjustStartIfFirstSnapshot(start, true);
23472357

23482358
const auto isCkptTypeDisk = isDiskCheckpointType(checkpointType);
23492359
uint32_t flags = isCkptTypeDisk ? MARKER_FLAG_DISK : MARKER_FLAG_MEMORY;
@@ -2364,9 +2374,12 @@ void ActiveStream::sendSnapshotAndSeqnoAdvanced(CheckpointType checkpointType,
23642374
queueSeqnoAdvanced();
23652375
}
23662376

2367-
uint64_t ActiveStream::adjustStartIfFirstSnapshot(uint64_t start) {
2377+
uint64_t ActiveStream::adjustStartIfFirstSnapshot(uint64_t start,
2378+
bool isCompleteSnapshot) {
23682379
if (!firstMarkerSent) {
2369-
firstMarkerSent = true;
2380+
if (isCompleteSnapshot) {
2381+
firstMarkerSent = true;
2382+
}
23702383
return std::min(snap_start_seqno_, start);
23712384
}
23722385
return start;

engines/ep/src/dcp/active_stream.h

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,23 @@ class ActiveStream : public Stream,
190190

191191
void setBackfillRemaining_UNLOCKED(size_t value);
192192

193+
// The source of the snapshot marker
194+
//
195+
// History - This is a range which has history, all updates to keys will be
196+
// returned.
197+
// NoHistory - A range which does not have history, all keys are the most
198+
// recent updates.
199+
// NoHistoryPrologue - This is the NoHistory range from a disk snapshot also
200+
// also contains History. The backfill will cross from
201+
// the no-history to history ranges.
202+
//
203+
// NoHistoryPrologue exists to indicate the case when a disk snapshot
204+
// has both History and NoHistory ranges - in this case markDiskSnapshot
205+
// for example will get invoked twice by the same source backfill. First
206+
// NoHistoryPrologue, second History. This allows ActiveStream to
207+
// distinguish from NoHistory which will not transition to History.
208+
enum SnapshotSource { History, NoHistory, NoHistoryPrologue };
209+
193210
/**
194211
* Queues a snapshot marker to be sent - only if there are items in
195212
* the backfill range which will be sent.
@@ -206,14 +223,16 @@ class ActiveStream : public Stream,
206223
* @param maxVisibleSeqno seqno of last visible (commit/mutation/system
207224
* event) item
208225
* @param timestamp of the disk snapshot (if available)
226+
* @param source if the snapshot is a history or non-history snapshot
209227
* @return If the stream has queued a snapshot marker. If this is false, the
210228
* stream determined none of the items in the backfill would be sent
211229
*/
212230
bool markDiskSnapshot(uint64_t startSeqno,
213231
uint64_t endSeqno,
214232
std::optional<uint64_t> highCompletedSeqno,
215233
uint64_t maxVisibleSeqno,
216-
std::optional<uint64_t> timestamp);
234+
std::optional<uint64_t> timestamp,
235+
SnapshotSource source);
217236

218237
/**
219238
* Queues a single "Out of Seqno Order" marker with the 'start' flag
@@ -759,9 +778,16 @@ class ActiveStream : public Stream,
759778
* the stream.
760779
* If firstMarkerSent is false this call will set it to true.
761780
* @param start a seqno we think should be the snapshot start
781+
* @param isCompleteSnapshot a boolean which was added by the History/CDC
782+
* work. This bool should be true for when the snapshot is not spread
783+
* over a >1 markers - which is what CDC can do when it has to send
784+
* a disk snapshot as NoHistory{a,b} followed by History{c,d}. If
785+
* this bool is true, the stream can state that the first snapshot
786+
* has been fully processed (the marker of the first snapshot).
762787
* @return the snapshot start to use
763788
*/
764-
uint64_t adjustStartIfFirstSnapshot(uint64_t start);
789+
uint64_t adjustStartIfFirstSnapshot(uint64_t start,
790+
bool isCompleteSnapshot);
765791

766792
/* The last sequence number queued from memory, but is yet to be
767793
snapshotted and put onto readyQ */

engines/ep/src/dcp/backfill_by_id_disk.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ backfill_status_t DCPBackfillByIdDisk::create() {
9393
stream->setDead(cb::mcbp::DcpStreamEndStatus::BackfillFail);
9494
transitionState(backfill_state_done);
9595
} else {
96+
// Will check if a history scan is required.
97+
setupForHistoryScan(*stream, *scanCtx, 0);
98+
9699
bool markerSent = stream->markOSODiskSnapshot(scanCtx->maxSeqno);
97100
if (markerSent) {
98101
transitionState(backfill_state_scanning);
@@ -125,11 +128,29 @@ backfill_status_t DCPBackfillByIdDisk::scan() {
125128
return backfill_success;
126129
}
127130

128-
transitionState(backfill_state_completing);
131+
if (historyScan) {
132+
complete(*stream, false);
133+
transitionState(backfill_state_scanning_history_snapshot);
134+
} else {
135+
transitionState(backfill_state_completing);
136+
}
129137

130138
return backfill_success;
131139
}
132140

141+
void DCPBackfillByIdDisk::complete(ActiveStream& stream, bool cancelled) {
142+
stream.completeOSOBackfill(
143+
scanCtx->maxSeqno, runtime, scanCtx->diskBytesRead);
144+
145+
auto severity = cancelled ? spdlog::level::level_enum::info
146+
: spdlog::level::level_enum::debug;
147+
stream.log(severity,
148+
"({}) Backfill task cid:{} {}",
149+
vbid,
150+
cid.to_string(),
151+
cancelled ? "cancelled" : "finished");
152+
}
153+
133154
void DCPBackfillByIdDisk::complete(bool cancelled) {
134155
auto stream = streamPtr.lock();
135156
if (!stream) {
@@ -143,16 +164,7 @@ void DCPBackfillByIdDisk::complete(bool cancelled) {
143164
return;
144165
}
145166

146-
stream->completeOSOBackfill(
147-
scanCtx->maxSeqno, runtime, scanCtx->diskBytesRead);
148-
149-
auto severity = cancelled ? spdlog::level::level_enum::info
150-
: spdlog::level::level_enum::debug;
151-
stream->log(severity,
152-
"({}) Backfill task cid:{} {}",
153-
vbid,
154-
cid.to_string(),
155-
cancelled ? "cancelled" : "finished");
167+
complete(*stream, cancelled);
156168

157169
transitionState(backfill_state_done);
158170
}

engines/ep/src/dcp/backfill_by_id_disk.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class DCPBackfillByIdDisk : public DCPBackfillDisk {
5050
*/
5151
void complete(bool cancelled) override;
5252

53+
void complete(ActiveStream& stream, bool cancelled);
54+
5355
/// collection to scan for
5456
CollectionID cid;
5557
};

engines/ep/src/dcp/backfill_by_seqno_disk.cc

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ backfill_status_t DCPBackfillBySeqnoDisk::create() {
141141

142142
stream->setDead(cb::mcbp::DcpStreamEndStatus::Rollback);
143143
transitionState(backfill_state_done);
144+
return backfill_success;
145+
}
146+
147+
// Check if a history scan is required or if only a history scan is required
148+
if (setupForHistoryScan(*stream, *scanCtx, startSeqno)) {
149+
// The scan is completely inside the history window
150+
transitionState(backfill_state_scanning_history_snapshot);
144151
} else {
145152
bool markerSent = markDiskSnapshot(*stream, *scanCtx, *kvstore);
146153

@@ -177,8 +184,12 @@ backfill_status_t DCPBackfillBySeqnoDisk::scan() {
177184
auto& bySeqnoCtx = dynamic_cast<BySeqnoScanContext&>(*scanCtx);
178185
switch (kvstore->scan(bySeqnoCtx)) {
179186
case scan_success:
180-
stream->setBackfillScanLastRead(scanCtx->lastReadSeqno);
181-
transitionState(backfill_state_completing);
187+
if (historyScan) {
188+
transitionState(backfill_state_scanning_history_snapshot);
189+
} else {
190+
stream->setBackfillScanLastRead(scanCtx->lastReadSeqno);
191+
transitionState(backfill_state_completing);
192+
}
182193
return backfill_success;
183194
case scan_again:
184195
// Scan should run again (e.g. was paused by callback)
@@ -269,11 +280,22 @@ bool DCPBackfillBySeqnoDisk::markDiskSnapshot(ActiveStream& stream,
269280
if (stream.getFilter().isLegacyFilter()) {
270281
return markLegacyDiskSnapshot(stream, scanCtx, kvs);
271282
}
272-
return stream.markDiskSnapshot(startSeqno,
273-
scanCtx.maxSeqno,
274-
scanCtx.persistedCompletedSeqno,
275-
scanCtx.maxVisibleSeqno,
276-
scanCtx.timestamp);
283+
// HistoryScan: If a disk snapshot is being "split" into non-history and
284+
// history ranges, then the endSeqno of this first range should show the
285+
// entire snapshot. E.g.
286+
// disk snapshot is [a...d], but split
287+
// no-history[a..b]
288+
// history[c..d]
289+
// Then all of the markers from this backfill stats start:a, end:d and mvs
290+
// hcs can only be valid once d is reached.
291+
return stream.markDiskSnapshot(
292+
startSeqno,
293+
historyScan ? historyScan->snapshotMaxSeqno : scanCtx.maxSeqno,
294+
scanCtx.persistedCompletedSeqno,
295+
scanCtx.maxVisibleSeqno,
296+
scanCtx.timestamp,
297+
historyScan ? ActiveStream::SnapshotSource::NoHistoryPrologue
298+
: ActiveStream::SnapshotSource::NoHistory);
277299
}
278300

279301
// This function is used for backfills where the stream is configured as a
@@ -348,7 +370,8 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
348370
scanCtx.maxSeqno,
349371
scanCtx.persistedCompletedSeqno,
350372
scanCtx.maxVisibleSeqno,
351-
scanCtx.timestamp);
373+
scanCtx.timestamp,
374+
ActiveStream::SnapshotSource::NoHistory);
352375
}
353376

354377
// Need to figure out the maxSeqno/maxVisibleSeqno for calling
@@ -433,7 +456,12 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
433456
if (gv.item->isCommitted()) {
434457
// Step 3. If this is a committed item, done.
435458
return stream.markDiskSnapshot(
436-
startSeqno, stats.highSeqno, {}, stats.highSeqno, {});
459+
startSeqno,
460+
stats.highSeqno,
461+
{},
462+
stats.highSeqno,
463+
{},
464+
ActiveStream::SnapshotSource::NoHistory);
437465
}
438466
} else if (gv.getStatus() != cb::engine_errc::no_such_key) {
439467
stream.log(spdlog::level::level_enum::warn,
@@ -546,8 +574,12 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
546574
cb.maxVisibleSeqno < backfillRangeEndSeqno) {
547575
stream.setEndSeqno(cb.maxVisibleSeqno);
548576
}
549-
return stream.markDiskSnapshot(
550-
startSeqno, cb.maxVisibleSeqno, {}, cb.maxVisibleSeqno, {});
577+
return stream.markDiskSnapshot(startSeqno,
578+
cb.maxVisibleSeqno,
579+
{},
580+
cb.maxVisibleSeqno,
581+
{},
582+
ActiveStream::SnapshotSource::NoHistory);
551583
} else {
552584
endStreamIfNeeded();
553585
// Found nothing committed at all

0 commit comments

Comments
 (0)