Skip to content

Commit 03c77b9

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-34017: Send HCS in Disk snapshot
Send the HCS in Disk snapshots so that the replica node can flush a correct HCS at the end of the snapshot which allows a fast warmup (HCS to HPS) if we do not do any more SyncWrites after our Disk snapshot. In a future patch, we will pass this through to the flusher via the CheckpointManager. Change-Id: I36631b53611e903791643f7ce960960eea3f548f Reviewed-on: http://review.couchbase.org/113187 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent c35974d commit 03c77b9

File tree

14 files changed

+127
-23
lines changed

14 files changed

+127
-23
lines changed

daemon/mcbp_validators.cc

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,28 @@ static Status dcp_stream_end_validator(Cookie& cookie) {
564564
}
565565

566566
static Status dcp_snapshot_marker_validator(Cookie& cookie) {
567+
auto& header = cookie.getHeader();
568+
569+
// Pass the extras len in because we will check it manually as it is
570+
// variable length
567571
auto status = McbpValidator::verify_header(cookie,
568-
20,
572+
header.getExtlen(),
569573
ExpectedKeyLen::Zero,
570574
ExpectedValueLen::Zero,
571575
ExpectedCas::Any,
572576
PROTOCOL_BINARY_RAW_BYTES);
573577
if (status != Status::Success) {
574578
return status;
575579
}
580+
581+
// Validate our extras length is correct
582+
using cb::mcbp::request::DcpSnapshotMarkerV1Payload;
583+
using cb::mcbp::request::DcpSnapshotMarkerV2Payload;
584+
if (!(header.getExtlen() == sizeof(DcpSnapshotMarkerV1Payload) ||
585+
header.getExtlen() == sizeof(DcpSnapshotMarkerV2Payload))) {
586+
return Status::Einval;
587+
}
588+
576589
return verify_common_dcp_restrictions(cookie);
577590
}
578591

engines/ep/src/couch-kvstore/couch-kvstore.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,19 @@ ScanContext* CouchKVStore::initScanContext(
14331433
return NULL;
14341434
}
14351435

1436+
auto* vbState = getVBucketState(vbid);
1437+
if (!vbState) {
1438+
if (readVBState(db, vbid) == CouchKVStore::ReadVBStateStatus::Success) {
1439+
vbState = getVBucketState(vbid);
1440+
}
1441+
if (!vbState) {
1442+
EP_LOG_WARN(
1443+
"CouchKVStore::initScanContext:Failed to obtain vbState for"
1444+
"the highCompletedSeqno");
1445+
return NULL;
1446+
}
1447+
}
1448+
14361449
size_t scanId = scanCounter++;
14371450

14381451
auto collectionsManifest = getDroppedCollections(*db);
@@ -1452,6 +1465,7 @@ ScanContext* CouchKVStore::initScanContext(
14521465
options,
14531466
valOptions,
14541467
count,
1468+
vbState->highCompletedSeqno,
14551469
configuration,
14561470
collectionsManifest);
14571471
sctx->logger = &logger;

engines/ep/src/dcp/active_stream.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,10 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
234234
}
235235
}
236236

237-
void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
237+
void ActiveStream::markDiskSnapshot(
238+
uint64_t startSeqno,
239+
uint64_t endSeqno,
240+
boost::optional<uint64_t> highCompletedSeqno) {
238241
{
239242
LockHolder lh(streamMutex);
240243
uint64_t chkCursorSeqno = endSeqno;
@@ -291,14 +294,15 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
291294
logPrefix,
292295
startSeqno,
293296
endSeqno);
294-
// @TODO push through HCS
297+
// If the stream supports SyncRep then send the HCS in the
298+
// SnapshotMarker
295299
pushToReadyQ(std::make_unique<SnapshotMarker>(
296300
opaque_,
297301
vb_,
298302
startSeqno,
299303
endSeqno,
300304
MARKER_FLAG_DISK | MARKER_FLAG_CHK,
301-
boost::none /*HCS*/,
305+
supportSyncReplication() ? highCompletedSeqno : boost::none,
302306
sid));
303307
lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
304308

engines/ep/src/dcp/active_stream.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ class ActiveStream : public Stream,
115115
backfillRemaining.fetch_add(by, std::memory_order_relaxed);
116116
}
117117

118-
void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
118+
void markDiskSnapshot(uint64_t startSeqno,
119+
uint64_t endSeqno,
120+
boost::optional<uint64_t> highCompletedSeqno);
119121

120122
bool backfillReceived(std::unique_ptr<Item> itm,
121123
backfill_source_t backfill_source,

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ backfill_status_t DCPBackfillDisk::create() {
248248
transitionState(backfill_state_done);
249249
} else {
250250
stream->incrBackfillRemaining(scanCtx->documentCount);
251-
stream->markDiskSnapshot(startSeqno, scanCtx->maxSeqno);
251+
stream->markDiskSnapshot(
252+
startSeqno, scanCtx->maxSeqno, scanCtx->highCompletedSeqno);
252253
transitionState(backfill_state_scanning);
253254
}
254255

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ backfill_status_t DCPBackfillMemory::run() {
9090
stream->incrBackfillRemaining(items.size());
9191

9292
/* Mark disk snapshot */
93-
stream->markDiskSnapshot(startSeqno, adjustedEndSeqno);
93+
stream->markDiskSnapshot(startSeqno, adjustedEndSeqno, {});
9494

9595
/* Move every item to the stream */
9696
for (auto& item : items) {
@@ -249,7 +249,8 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
249249
std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
250250

251251
/* Mark disk snapshot */
252-
stream->markDiskSnapshot(startSeqno, endSeqno);
252+
stream->markDiskSnapshot(
253+
startSeqno, endSeqno, evb->getHighCompletedSeqno());
253254

254255
/* Change the backfill state */
255256
transitionState(BackfillState::Scanning);

engines/ep/src/dcp/passive_stream.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,8 +869,6 @@ ENGINE_ERROR_CODE PassiveStream::processDropScope(VBucket& vb,
869869
}
870870

871871
void PassiveStream::processMarker(SnapshotMarker* marker) {
872-
// @TODO sanity check - remove when we send the HCS
873-
Expects(!marker->getHighCompletedSeqno());
874872
VBucketPtr vb = engine->getVBucket(vb_);
875873

876874
cur_snapshot_start.store(marker->getStartSeqno());

engines/ep/src/kvstore.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ ScanContext::ScanContext(
5454
DocumentFilter _docFilter,
5555
ValueFilter _valFilter,
5656
uint64_t _documentCount,
57+
uint64_t highCompletedSeqno,
5758
const KVStoreConfig& _config,
5859
const std::vector<Collections::KVStore::DroppedCollection>&
5960
droppedCollections)
@@ -68,6 +69,7 @@ ScanContext::ScanContext(
6869
docFilter(_docFilter),
6970
valFilter(_valFilter),
7071
documentCount(_documentCount),
72+
highCompletedSeqno(highCompletedSeqno),
7173
logger(globalBucketLogger.get()),
7274
config(_config),
7375
collectionsContext(droppedCollections) {

engines/ep/src/kvstore.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ class ScanContext {
211211
DocumentFilter _docFilter,
212212
ValueFilter _valFilter,
213213
uint64_t _documentCount,
214+
uint64_t highCompletedSeqno,
214215
const KVStoreConfig& _config,
215216
const std::vector<Collections::KVStore::DroppedCollection>&
216217
droppedCollections);
@@ -227,6 +228,7 @@ class ScanContext {
227228
const DocumentFilter docFilter;
228229
const ValueFilter valFilter;
229230
const uint64_t documentCount;
231+
const uint64_t highCompletedSeqno;
230232

231233
BucketLogger* logger;
232234
const KVStoreConfig& config;

engines/ep/src/magma-kvstore/magma-kvstore.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ class MagmaScanContext : public ScanContext {
13491349
DocumentFilter _docFilter,
13501350
ValueFilter _valFilter,
13511351
uint64_t _documentCount,
1352+
uint64_t highCompletedSeqno,
13521353
const KVStoreConfig& _config,
13531354
const std::vector<Collections::KVStore::DroppedCollection>&
13541355
droppedCollections,
@@ -1363,6 +1364,7 @@ class MagmaScanContext : public ScanContext {
13631364
_docFilter,
13641365
_valFilter,
13651366
_documentCount,
1367+
highCompletedSeqno,
13661368
_config,
13671369
droppedCollections),
13681370
kvHandle(kvHandle) {
@@ -1386,6 +1388,7 @@ ScanContext* MagmaKVStore::initScanContext(
13861388
uint64_t highSeqno;
13871389
uint64_t purgeSeqno;
13881390
uint64_t docCount;
1391+
uint64_t highCompletedSeqno;
13891392
{
13901393
std::shared_lock<std::shared_timed_mutex> lock(kvHandle->vbstateMutex);
13911394
auto vbstate = cachedVBStates[vbid.get()].get();
@@ -1397,6 +1400,7 @@ ScanContext* MagmaKVStore::initScanContext(
13971400
highSeqno = vbstate->highSeqno;
13981401
purgeSeqno = vbstate->purgeSeqno;
13991402
docCount = cachedMagmaInfo[vbid.get()]->docCount;
1403+
highCompletedSeqno = vbstate->highCompletedSeqno;
14001404
}
14011405

14021406
auto collectionsManifest = getDroppedCollections(vbid);
@@ -1460,6 +1464,7 @@ ScanContext* MagmaKVStore::initScanContext(
14601464
options,
14611465
valOptions,
14621466
docCount,
1467+
highCompletedSeqno,
14631468
configuration,
14641469
collectionsManifest,
14651470
kvHandle);

0 commit comments

Comments
 (0)