Skip to content

Commit a2b7748

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-34017: Flush HCS from Disk Checkpoint
Flush the HCS stored in Disk checkpoints when we flush Disk checkpoints so that we have a correct HCS which will be used to optimise warmup as often as possible. Change-Id: I6db95f60f82259ebbedcae048824ca2d885a8e93 Reviewed-on: http://review.couchbase.org/113255 Reviewed-by: James Harrison <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 0abe1fd commit a2b7748

13 files changed

+145
-32
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ Checkpoint::Checkpoint(EPStats& st,
9494
uint64_t id,
9595
uint64_t snapStart,
9696
uint64_t snapEnd,
97+
boost::optional<uint64_t> highCompletedSeqno,
9798
Vbid vbid,
9899
CheckpointType checkpointType)
99100
: stats(st),
@@ -110,7 +111,8 @@ Checkpoint::Checkpoint(EPStats& st,
110111
metaKeyIndex(keyIndexTrackingAllocator),
111112
keyIndexMemUsage(0),
112113
queuedItemsMemUsage(0),
113-
checkpointType(checkpointType) {
114+
checkpointType(checkpointType),
115+
highCompletedSeqno(highCompletedSeqno) {
114116
stats.coreLocal.get()->memOverhead.fetch_add(sizeof(Checkpoint));
115117
}
116118

engines/ep/src/checkpoint.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "item.h"
2424
#include "monotonic.h"
2525

26+
#include <boost/optional.hpp>
2627
#include <folly/Synchronized.h>
2728
#include <platform/non_negative_counter.h>
2829
#include <utilities/memory_tracking_allocator.h>
@@ -394,6 +395,7 @@ class Checkpoint {
394395
uint64_t id,
395396
uint64_t snapStart,
396397
uint64_t snapEnd,
398+
boost::optional<uint64_t> highCompletedSeqno,
397399
Vbid vbid,
398400
CheckpointType checkpointType);
399401

@@ -519,6 +521,14 @@ class Checkpoint {
519521
checkpointType = type;
520522
}
521523

524+
void setHighCompletedSeqno(boost::optional<uint64_t> seqno) {
525+
highCompletedSeqno = seqno;
526+
}
527+
528+
boost::optional<uint64_t> getHighCompletedSeqno() {
529+
return highCompletedSeqno;
530+
}
531+
522532
/**
523533
* Returns an iterator pointing to the beginning of the CheckpointQueue,
524534
* toWrite.
@@ -639,5 +649,12 @@ class Checkpoint {
639649
// Is this a checkpoint created by a replica from a received disk snapshot?
640650
CheckpointType checkpointType;
641651

652+
// The SyncRep HCS for this checkpoint. Used to ensure that we flush a
653+
// correct HCS at the end of a snapshot to disk. This is optional as it is
654+
// only necessary for Disk snapshot (due to de-dupe) and the way we retrieve
655+
// items from the CheckpointManager for memory snapshots makes it
656+
// non-trivial to send the HCS in memory snapshot markers.
657+
boost::optional<uint64_t> highCompletedSeqno;
658+
642659
friend std::ostream& operator <<(std::ostream& os, const Checkpoint& m);
643660
};

engines/ep/src/checkpoint_manager.cc

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ CheckpointManager::CheckpointManager(EPStats& st,
5252
// when the checkpointList is empty.
5353
// Only in CheckpointManager::clear_UNLOCKED, the checkpointList
5454
// is temporarily cleared and a new open checkpoint added immediately.
55-
addOpenCheckpoint(1, lastSnapStart, lastSnapEnd, CheckpointType::Memory);
55+
addOpenCheckpoint(
56+
1, lastSnapStart, lastSnapEnd, {}, CheckpointType::Memory);
5657

5758
if (checkpointConfig.isPersistenceEnabled()) {
5859
// Register the persistence cursor
@@ -137,13 +138,14 @@ Checkpoint& CheckpointManager::getOpenCheckpoint_UNLOCKED(
137138

138139
void CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
139140
addNewCheckpoint_UNLOCKED(
140-
id, lastBySeqno, lastBySeqno, CheckpointType::Memory);
141+
id, lastBySeqno, lastBySeqno, {}, CheckpointType::Memory);
141142
}
142143

143144
void CheckpointManager::addNewCheckpoint_UNLOCKED(
144145
uint64_t id,
145146
uint64_t snapStartSeqno,
146147
uint64_t snapEndSeqno,
148+
boost::optional<uint64_t> highCompletedSeqno,
147149
CheckpointType checkpointType) {
148150
// First, we must close the open checkpoint.
149151
auto& oldOpenCkpt = *checkpointList.back();
@@ -169,7 +171,11 @@ void CheckpointManager::addNewCheckpoint_UNLOCKED(
169171
id,
170172
snapStartSeqno,
171173
snapEndSeqno);
172-
addOpenCheckpoint(id, snapStartSeqno, snapEndSeqno, checkpointType);
174+
addOpenCheckpoint(id,
175+
snapStartSeqno,
176+
snapEndSeqno,
177+
highCompletedSeqno,
178+
checkpointType);
173179

174180
/* If cursors reached to the end of its current checkpoint, move it to the
175181
next checkpoint. DCP and Persistence cursors can skip a "checkpoint end"
@@ -202,16 +208,23 @@ void CheckpointManager::addNewCheckpoint_UNLOCKED(
202208
}
203209
}
204210

205-
void CheckpointManager::addOpenCheckpoint(uint64_t id,
206-
uint64_t snapStart,
207-
uint64_t snapEnd,
208-
CheckpointType checkpointType) {
211+
void CheckpointManager::addOpenCheckpoint(
212+
uint64_t id,
213+
uint64_t snapStart,
214+
uint64_t snapEnd,
215+
boost::optional<uint64_t> highCompletedSeqno,
216+
CheckpointType checkpointType) {
209217
Expects(checkpointList.empty() ||
210218
checkpointList.back()->getState() ==
211219
checkpoint_state::CHECKPOINT_CLOSED);
212220

213-
auto ckpt = std::make_unique<Checkpoint>(
214-
stats, id, snapStart, snapEnd, vbucketId, checkpointType);
221+
auto ckpt = std::make_unique<Checkpoint>(stats,
222+
id,
223+
snapStart,
224+
snapEnd,
225+
highCompletedSeqno,
226+
vbucketId,
227+
checkpointType);
215228
// Add an empty-item into the new checkpoint.
216229
// We need this because every CheckpointCursor will point to this empty-item
217230
// at creation. So, the cursor will point at the first actual non-meta item
@@ -842,6 +855,7 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
842855
// limit.
843856
ItemsForCursor result((*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
844857
(*cursor.currentCheckpoint)->getSnapshotEndSeqno(),
858+
(*cursor.currentCheckpoint)->getHighCompletedSeqno(),
845859
(*cursor.currentCheckpoint)->getCheckpointType());
846860

847861
size_t itemCount = 0;
@@ -868,6 +882,11 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
868882
result.range.setEnd(
869883
(*cursor.currentCheckpoint)->getSnapshotEndSeqno());
870884

885+
// Only move the HCS at checkpoint end (don't want to flush a
886+
// HCS mid-checkpoint).
887+
result.highCompletedSeqno =
888+
(*cursor.currentCheckpoint)->getHighCompletedSeqno();
889+
871890
// However, we *do* want to move the cursor into the next
872891
// checkpoint if possible; as that means the checkpoint we just
873892
// completed has one less cursor in it (and could potentially be
@@ -952,6 +971,7 @@ void CheckpointManager::clear_UNLOCKED(vbucket_state_t vbState, uint64_t seqno)
952971
addOpenCheckpoint(vbucket_state_active ? 1 : 0 /* id */,
953972
lastBySeqno,
954973
lastBySeqno,
974+
{},
955975
CheckpointType::Memory);
956976
resetCursors();
957977
}
@@ -1077,9 +1097,11 @@ void CheckpointManager::setBackfillPhase(uint64_t start, uint64_t end) {
10771097
openCkpt.setSnapshotEndSeqno(end);
10781098
}
10791099

1080-
void CheckpointManager::createSnapshot(uint64_t snapStartSeqno,
1081-
uint64_t snapEndSeqno,
1082-
CheckpointType checkpointType) {
1100+
void CheckpointManager::createSnapshot(
1101+
uint64_t snapStartSeqno,
1102+
uint64_t snapEndSeqno,
1103+
boost::optional<uint64_t> highCompletedSeqno,
1104+
CheckpointType checkpointType) {
10831105
LockHolder lh(queueLock);
10841106

10851107
auto& openCkpt = getOpenCheckpoint_UNLOCKED(lh);
@@ -1093,11 +1115,15 @@ void CheckpointManager::createSnapshot(uint64_t snapStartSeqno,
10931115
openCkpt.setSnapshotStartSeqno(snapStartSeqno);
10941116
openCkpt.setSnapshotEndSeqno(snapEndSeqno);
10951117
openCkpt.setCheckpointType(checkpointType);
1118+
openCkpt.setHighCompletedSeqno(highCompletedSeqno);
10961119
return;
10971120
}
10981121

1099-
addNewCheckpoint_UNLOCKED(
1100-
openCkptId + 1, snapStartSeqno, snapEndSeqno, checkpointType);
1122+
addNewCheckpoint_UNLOCKED(openCkptId + 1,
1123+
snapStartSeqno,
1124+
snapEndSeqno,
1125+
highCompletedSeqno,
1126+
checkpointType);
11011127
}
11021128

11031129
void CheckpointManager::resetSnapshotRange() {

engines/ep/src/checkpoint_manager.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "monotonic.h"
2424
#include "queue_op.h"
2525

26+
#include <boost/optional.hpp>
2627
#include <memcached/engine_common.h>
2728
#include <memcached/vbucket.h>
2829
#include <memory>
@@ -58,11 +59,17 @@ class CheckpointManager {
5859
struct ItemsForCursor {
5960
ItemsForCursor(uint64_t start,
6061
uint64_t end,
62+
boost::optional<uint64_t> highCompletedSeqno = {},
6163
CheckpointType checkpointType = CheckpointType::Memory)
62-
: range(start, end), checkpointType(checkpointType) {
64+
: range(start, end),
65+
highCompletedSeqno(highCompletedSeqno),
66+
checkpointType(checkpointType) {
6367
}
6468
snapshot_range_t range;
6569
bool moreAvailable = {false};
70+
71+
// HCS that should be flushed
72+
boost::optional<uint64_t> highCompletedSeqno = {};
6673
CheckpointType checkpointType = CheckpointType::Memory;
6774
};
6875

@@ -338,6 +345,7 @@ class CheckpointManager {
338345

339346
void createSnapshot(uint64_t snapStartSeqno,
340347
uint64_t snapEndSeqno,
348+
boost::optional<uint64_t> highCompletedSeqno,
341349
CheckpointType checkpointType);
342350

343351
void resetSnapshotRange();
@@ -421,12 +429,14 @@ class CheckpointManager {
421429
* @param id for the new checkpoint
422430
* @param snapStartSeqno for the new checkpoint
423431
* @param snapEndSeqno for the new checkpoint
432+
* @param highCompletedSeqno optional SyncRep HCS to be flushed to disk
424433
* @param checkpointType is the checkpoint created from a replica receiving
425434
* a disk snapshot?
426435
*/
427436
void addNewCheckpoint_UNLOCKED(uint64_t id,
428437
uint64_t snapStartSeqno,
429438
uint64_t snapEndSeqno,
439+
boost::optional<uint64_t> highCompletedSeqno,
430440
CheckpointType checkpointType);
431441

432442
/*
@@ -445,12 +455,14 @@ class CheckpointManager {
445455
* @param id for the new checkpoint
446456
* @param snapStartSeqno for the new checkpoint
447457
* @param snapEndSeqno for the new checkpoint
458+
* @param highCompletedSeqno the SyncRepl HCS to be flushed to disk
448459
* @param checkpointType is the checkpoint created from a replica receiving
449460
* a disk snapshot?
450461
*/
451462
void addOpenCheckpoint(uint64_t id,
452463
uint64_t snapStart,
453464
uint64_t snapEnd,
465+
boost::optional<uint64_t> highCompletedSeqno,
454466
CheckpointType checkpointType);
455467

456468
bool moveCursorToNextCheckpoint(CheckpointCursor &cursor);

engines/ep/src/dcp/passive_stream.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,14 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
896896
vb->setReceivingInitialDiskSnapshot(true);
897897
ckptMgr.createSnapshot(cur_snapshot_start.load(),
898898
cur_snapshot_end.load(),
899+
marker->getHighCompletedSeqno(),
899900
checkpointType);
900901
} else {
901902
if (marker->getFlags() & MARKER_FLAG_CHK ||
902903
vb->checkpointManager->getOpenCheckpointId() == 0) {
903904
ckptMgr.createSnapshot(cur_snapshot_start.load(),
904905
cur_snapshot_end.load(),
906+
marker->getHighCompletedSeqno(),
905907
checkpointType);
906908
} else {
907909
// If we are reconnecting then we need to update the snap end

engines/ep/src/ep_bucket.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,14 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
409409
Collections::VB::Flush collectionFlush(vb->getManifest());
410410

411411
// HCS is optional because we have to update it on disk only if some
412-
// Commit/Abort SyncWrite is found in the flush-batch
413-
boost::optional<uint64_t> hcs;
412+
// Commit/Abort SyncWrite is found in the flush-batch. If we're
413+
// flushing Disk checkpoints then the toFlush value may be
414+
// supplied. In this case, this should be the HCS received from the
415+
// Active node and should be greater than or equal to the HCS for
416+
// any other item in this flush batch. This is required because we
417+
// send mutations instead of a commits and would not otherwise
418+
// update the HCS on disk.
419+
boost::optional<uint64_t> hcs = toFlush.highCompletedSeqno;
414420
// HPS is optional because we have to update it on disk only if a
415421
// prepare is found in the flush-batch
416422
boost::optional<uint64_t> hps;

engines/ep/src/vbucket.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
407407
auto ckptItems = checkpointManager->getItemsForPersistence(
408408
result.items, ckptMgrLimit);
409409
result.range = ckptItems.range;
410+
result.highCompletedSeqno = ckptItems.highCompletedSeqno;
410411
ckptItemsAvailable = ckptItems.moreAvailable;
411412
stats.persistenceCursorGetItemsHisto.add(
412413
std::chrono::duration_cast<std::chrono::microseconds>(

engines/ep/src/vbucket.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
471471
std::vector<queued_item> items;
472472
snapshot_range_t range{0, 0};
473473
bool moreAvailable = false;
474+
boost::optional<uint64_t> highCompletedSeqno = {};
474475
};
475476

476477
/**

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,33 @@ TEST_P(DurabilityPassiveStreamPersistentTest,
968968
EXPECT_EQ(1, ack.getPreparedSeqno());
969969
}
970970

971+
TEST_P(DurabilityPassiveStreamPersistentTest, DiskSnapshotHCSPersisted) {
972+
testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(
973+
DocumentState::Alive);
974+
flushVBucketToDiskIfPersistent(vbid, 2);
975+
{
976+
auto vb = store->getVBucket(vbid);
977+
EXPECT_EQ(2, vb->getHighCompletedSeqno());
978+
}
979+
980+
// Reset and warmup to check persistence
981+
consumer->closeAllStreams();
982+
consumer.reset();
983+
resetEngineAndWarmup();
984+
// Recreate the consumer so that our normal test TearDown will work
985+
consumer =
986+
std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
987+
consumer->enableSyncReplication();
988+
consumer->addStream(0 /*opaque*/, vbid, 0 /*flags*/);
989+
stream = static_cast<MockPassiveStream*>(
990+
(consumer->getVbucketStream(vbid)).get());
991+
992+
{
993+
auto vb = store->getVBucket(vbid);
994+
EXPECT_EQ(2, vb->getHighCompletedSeqno());
995+
}
996+
}
997+
971998
TEST_P(DurabilityPassiveStreamTest,
972999
NoSeqnoAckOnStreamAcceptanceIfNotSupported) {
9731000
consumer->disableSyncReplication();
@@ -1007,7 +1034,7 @@ void DurabilityPassiveStreamTest::
10071034
2 /*snapStart*/,
10081035
4 /*snapEnd*/,
10091036
dcp_marker_flag_t::MARKER_FLAG_DISK | MARKER_FLAG_CHK,
1010-
{} /*HCS*/,
1037+
2 /*HCS*/,
10111038
{} /*streamId*/);
10121039
stream->processMarker(&marker);
10131040

0 commit comments

Comments
 (0)