Skip to content

Commit 9a5827d

Browse files
committed
MB-35607: Correct HCS flushing logic
Currently the HCS flushing logic is flawed in two ways: 1) If we disconnect and reconnect a stream then flush a disk snapshot the HCS may be weakly monotonic (if the active node moves on and streams to the replica from disk without doing any other SyncWrites). 2) getItemsForCursor is returning a HCS value before the checkpoint end. This is incorrect as we can flush partial disk snapshots and a subsequent warmup may bring the node back in a bad state (missing prepares). This should work fine if the rest of the disk snapshot is streamed, but if this node is promoted beforehand and streams from memory to a replica then the replica will start firing assertions. This is a dataloss situation already, but undesirable nonetheless. Change-Id: I700e25d248968ce01abd68236a61fe3a960b11a5 Reviewed-on: http://review.couchbase.org/113552 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent a211154 commit 9a5827d

File tree

7 files changed

+73
-18
lines changed

7 files changed

+73
-18
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
#include <boost/optional/optional_io.hpp>
1819
#include <gsl.h>
1920
#include <platform/checked_snprintf.h>
2021
#include <string>
@@ -385,8 +386,8 @@ std::ostream& operator <<(std::ostream& os, const Checkpoint& c) {
385386
<< " snap:{" << c.getSnapshotStartSeqno() << ","
386387
<< c.getSnapshotEndSeqno() << "}"
387388
<< " state:" << to_string(c.getState())
388-
<< " type:" << to_string(c.getCheckpointType()) << " items:["
389-
<< std::endl;
389+
<< " type:" << to_string(c.getCheckpointType())
390+
<< " hcs:" << c.getHighCompletedSeqno() << " items:[" << std::endl;
390391
for (const auto& e : c.toWrite) {
391392
os << "\t{" << e->getBySeqno() << "," << to_string(e->getOperation());
392393
e->isDeleted() ? os << "[d]," : os << ",";

engines/ep/src/checkpoint.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ class Checkpoint {
525525
highCompletedSeqno = seqno;
526526
}
527527

528-
boost::optional<uint64_t> getHighCompletedSeqno() {
528+
boost::optional<uint64_t> getHighCompletedSeqno() const {
529529
return highCompletedSeqno;
530530
}
531531

engines/ep/src/checkpoint_manager.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,6 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
855855
// limit.
856856
ItemsForCursor result((*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
857857
(*cursor.currentCheckpoint)->getSnapshotEndSeqno(),
858-
(*cursor.currentCheckpoint)->getHighCompletedSeqno(),
859858
(*cursor.currentCheckpoint)->getCheckpointType());
860859

861860
size_t itemCount = 0;
@@ -875,18 +874,18 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
875874
itemCount++;
876875

877876
if (qi->getOperation() == queue_op::checkpoint_end) {
877+
// Only move the HCS at checkpoint end (don't want to flush a
878+
// HCS mid-checkpoint).
879+
result.highCompletedSeqno =
880+
(*cursor.currentCheckpoint)->getHighCompletedSeqno();
881+
878882
// Reached the end of a checkpoint; check if we have exceeded
879883
// our limit.
880884
if (itemCount >= approxLimit) {
881885
// Reached our limit - don't want any more items.
882886
result.range.setEnd(
883887
(*cursor.currentCheckpoint)->getSnapshotEndSeqno());
884888

885-
// Only move the HCS at checkpoint end (don't want to flush a
886-
// HCS mid-checkpoint).
887-
result.highCompletedSeqno =
888-
(*cursor.currentCheckpoint)->getHighCompletedSeqno();
889-
890889
// However, we *do* want to move the cursor into the next
891890
// checkpoint if possible; as that means the checkpoint we just
892891
// completed has one less cursor in it (and could potentially be

engines/ep/src/checkpoint_manager.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,19 @@ class CheckpointManager {
5959
struct ItemsForCursor {
6060
ItemsForCursor(uint64_t start,
6161
uint64_t end,
62-
boost::optional<uint64_t> highCompletedSeqno = {},
63-
CheckpointType checkpointType = CheckpointType::Memory)
62+
CheckpointType checkpointType = CheckpointType::Memory,
63+
boost::optional<uint64_t> highCompletedSeqno = {})
6464
: range(start, end),
65-
highCompletedSeqno(highCompletedSeqno),
66-
checkpointType(checkpointType) {
65+
checkpointType(checkpointType),
66+
highCompletedSeqno(highCompletedSeqno) {
6767
}
6868
snapshot_range_t range;
6969
bool moreAvailable = {false};
70+
CheckpointType checkpointType = CheckpointType::Memory;
7071

71-
// HCS that should be flushed
72+
// HCS that should be flushed. Currently should only be set for Disk
73+
// Checkpoint runs.
7274
boost::optional<uint64_t> highCompletedSeqno = {};
73-
CheckpointType checkpointType = CheckpointType::Memory;
7475
};
7576

7677
/// Return type of expelUnreferencedCheckpointItems()

engines/ep/src/ep_bucket.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,16 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
417417
// any other item in this flush batch. This is required because we
418418
// send mutations instead of a commits and would not otherwise
419419
// update the HCS on disk.
420-
boost::optional<uint64_t> hcs = toFlush.highCompletedSeqno;
420+
boost::optional<uint64_t> hcs =
421+
boost::make_optional(false, uint64_t());
422+
423+
// HCS may be weakly monotonic when received via a disk snapshot so
424+
// we special case this for the disk snapshot instead of relaxing
425+
// the general constraint.
426+
if (toFlush.highCompletedSeqno &&
427+
*toFlush.highCompletedSeqno != vbstate.highCompletedSeqno) {
428+
hcs = toFlush.highCompletedSeqno;
429+
}
421430
// HPS is optional because we have to update it on disk only if a
422431
// prepare is found in the flush-batch
423432
boost::optional<uint64_t> hps;

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,9 +985,25 @@ TEST_P(DurabilityPassiveStreamPersistentTest,
985985
EXPECT_EQ(1, ack.getPreparedSeqno());
986986
}
987987

988-
TEST_P(DurabilityPassiveStreamPersistentTest, DiskSnapshotHCSPersisted) {
988+
void DurabilityPassiveStreamPersistentTest::testDiskSnapshotHCSPersisted() {
989989
testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(
990990
DocumentState::Alive);
991+
992+
// We won't flush a HCS from a snapshot marker until we process the entire
993+
// checkpoint, this is because we need to be pessimistic with flushing the
994+
// HCS in a disk checkpoint for our warmup optimization. If we flush a HCS
995+
// that is too high whilst receiving a disk snapshot we may end up in some
996+
// inconsistent state due to out of order commit.
997+
SnapshotMarker marker(
998+
0 /*opaque*/,
999+
vbid,
1000+
5 /*snapStart*/,
1001+
6 /*snapEnd*/,
1002+
dcp_marker_flag_t::MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
1003+
{} /*HCS*/,
1004+
{} /*streamId*/);
1005+
stream->processMarker(&marker);
1006+
9911007
flushVBucketToDiskIfPersistent(vbid, 2);
9921008
{
9931009
auto vb = store->getVBucket(vbid);
@@ -1009,9 +1025,31 @@ TEST_P(DurabilityPassiveStreamPersistentTest, DiskSnapshotHCSPersisted) {
10091025
{
10101026
auto vb = store->getVBucket(vbid);
10111027
EXPECT_EQ(2, vb->getHighCompletedSeqno());
1028+
EXPECT_EQ(4, vb->getHighSeqno());
10121029
}
10131030
}
10141031

1032+
TEST_P(DurabilityPassiveStreamPersistentTest, DiskSnapshotHCSPersisted) {
1033+
testDiskSnapshotHCSPersisted();
1034+
}
1035+
1036+
TEST_P(DurabilityPassiveStreamPersistentTest,
1037+
DiskSnapshotHCSIgnoredIfWeaklyMonotonic) {
1038+
testDiskSnapshotHCSPersisted();
1039+
SnapshotMarker marker(0 /*opaque*/,
1040+
vbid,
1041+
6 /*snapStart*/,
1042+
7 /*snapEnd*/,
1043+
dcp_marker_flag_t::MARKER_FLAG_DISK | MARKER_FLAG_CHK,
1044+
2 /*HCS*/,
1045+
{} /*streamId*/);
1046+
stream->processMarker(&marker);
1047+
1048+
// We don't flush any items but we will run the flusher which will advance
1049+
// use out of the checkpoint
1050+
flushVBucketToDiskIfPersistent(vbid, 0);
1051+
}
1052+
10151053
TEST_P(DurabilityPassiveStreamTest,
10161054
NoSeqnoAckOnStreamAcceptanceIfNotSupported) {
10171055
consumer->disableSyncReplication();

engines/ep/tests/module_tests/dcp_durability_stream_test.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,14 @@ class DurabilityPassiveStreamTest
155155
* Single-threaded.
156156
*/
157157
class DurabilityPassiveStreamPersistentTest
158-
: public DurabilityPassiveStreamTest {};
158+
: public DurabilityPassiveStreamTest {
159+
protected:
160+
/**
161+
* Test that hte HCS sent in a disk snapshot is persisted by sending sending
162+
* a disk snapshot containing a mutation instead of a commit.
163+
*/
164+
void testDiskSnapshotHCSPersisted();
165+
};
159166

160167
/**
161168
* ActiveStream tests for Durability against ephemeral buckets. Single-threaded.

0 commit comments

Comments
 (0)