Skip to content

Commit c4454a5

Browse files
committed
MB-42780: Make replica resilient to missing MARKER_FLAG_CHK
Since 6.5.0 the Active ensures that Disk snapshots are never merged into an existing checkpoint. That is achieve by setting the MARKER_FLAG_CHK in the SnapshotMarker sent to Replica. That all works fine as long as Active behaves as expected. In pre-6.5.0 we had issues where Active could miss the MARKER_FLAG_CHK in markers. In the 6.0.1->6.6.1 upgrade scenario seen in MB-42780, one of the effect is replica failure (with consequent rebalance/upgrade failure). With this patch, replica enforces the same logic that we had already added at active in 6.5.0. That makes replica resilient to any DCP Producer that misses to set the MARKER_FLAG_CHK properly. Change-Id: I9b32dec1caa5b262f0cb524317624e4e955481b2 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/140626 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 9decf85 commit c4454a5

File tree

7 files changed

+276
-27
lines changed

7 files changed

+276
-27
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,7 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
924924

925925
cur_snapshot_start.store(marker->getStartSeqno());
926926
cur_snapshot_end.store(marker->getEndSeqno());
927+
const auto prevSnapType = cur_snapshot_type.load();
927928
cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK)
928929
? Snapshot::Disk
929930
: Snapshot::Memory);
@@ -964,14 +965,26 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
964965
// not transmitted (optional is false), set visible to snap-end
965966
auto visibleSeq =
966967
marker->getMaxVisibleSeqno().value_or(marker->getEndSeqno());
968+
967969
if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
970+
// Case: receiving the first snapshot in a Disk snapshot.
971+
// Note that replica may never execute here as the active may switch
972+
// directly to in-memory and send the first snapshot in a Memory
973+
// snapshot.
974+
968975
vb->setReceivingInitialDiskSnapshot(true);
969976
ckptMgr.createSnapshot(cur_snapshot_start.load(),
970977
cur_snapshot_end.load(),
971978
hcs,
972979
checkpointType,
973980
visibleSeq);
974981
} else {
982+
// Case: receiving any type of snapshot (Disk/Memory).
983+
984+
// @todo: The check on CheckpointId=0 is legacy from when that could
985+
// really happen (TAP). Currently CheckpointId is a positive
986+
// monotonic sequence starting from 1, so the check can be
987+
// removed. Deferring to a dedicated change.
975988
if (marker->getFlags() & MARKER_FLAG_CHK ||
976989
vb->checkpointManager->getOpenCheckpointId() == 0) {
977990
ckptMgr.createSnapshot(cur_snapshot_start.load(),
@@ -980,11 +993,24 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
980993
checkpointType,
981994
visibleSeq);
982995
} else {
983-
// If we are reconnecting then we need to update the snap end
984-
// and potentially the checkpoint type as We do not send the
985-
// CHK snapshot marker flag for disk snapshots.
986-
ckptMgr.updateCurrentSnapshot(
987-
cur_snapshot_end.load(), visibleSeq, checkpointType);
996+
// MB-42780: In general we cannot merge multiple snapshots into
997+
// the same checkpoint. The only exception is for when replica
998+
// receives multiple Memory checkpoints in a row.
999+
if (prevSnapType == Snapshot::Memory &&
1000+
cur_snapshot_type == Snapshot::Memory) {
1001+
// If we are reconnecting then we need to update the snap
1002+
// end and potentially the checkpoint type as We do not send
1003+
// the CHK snapshot marker flag for disk snapshots.
1004+
ckptMgr.updateCurrentSnapshot(cur_snapshot_end.load(),
1005+
visibleSeq,
1006+
checkpointType);
1007+
} else {
1008+
ckptMgr.createSnapshot(cur_snapshot_start.load(),
1009+
cur_snapshot_end.load(),
1010+
hcs,
1011+
checkpointType,
1012+
visibleSeq);
1013+
}
9881014
}
9891015
}
9901016

@@ -1035,8 +1061,6 @@ void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
10351061
vb->notifyPassiveDMOfSnapEndReceived(byseqno);
10361062
cur_snapshot_prepare.store(false);
10371063
}
1038-
1039-
cur_snapshot_type.store(Snapshot::None);
10401064
}
10411065
}
10421066

engines/ep/src/dcp/stream.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ enum backfill_source_t {
4444
class Stream {
4545
public:
4646
enum class Snapshot {
47-
None,
48-
Disk,
49-
Memory
47+
None, // Used only at PassiveStream initialization
48+
Disk,
49+
Memory
5050
};
5151

5252
Stream(const std::string& name,

engines/ep/tests/mock/mock_checkpoint_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,9 @@ class MockCheckpointManager : public CheckpointManager {
114114
LockHolder lh(queueLock);
115115
return getOpenCheckpoint_UNLOCKED(lh).getCheckpointType();
116116
}
117+
118+
auto getPersistenceCursorPos() const {
119+
LockHolder lh(queueLock);
120+
return getPersistenceCursor()->currentPos;
121+
}
117122
};

engines/ep/tests/module_tests/checkpoint_test.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,14 +1394,18 @@ void SingleThreadedCheckpointTest::closeReplicaCheckpointOnMemorySnapshotEnd(
13941394
store->deleteVBucket(vb->getId(), cookie);
13951395
}
13961396

1397+
// MB-42780: Test disabled as already marked as "could validly be removed now"
1398+
// above + the test is now legally failing due to changes in the checkpoint-list
13971399
TEST_F(SingleThreadedCheckpointTest,
1398-
CloseReplicaCheckpointOnMemorySnapshotEnd_HighMemDisk) {
1400+
DISABLED_CloseReplicaCheckpointOnMemorySnapshotEnd_HighMemDisk) {
13991401
closeReplicaCheckpointOnMemorySnapshotEnd(
14001402
true, dcp_marker_flag_t::MARKER_FLAG_DISK);
14011403
}
14021404

1405+
// MB-42780: Test disabled as already marked as "could validly be removed now"
1406+
// above + the test is now legally failing due to changes in the checkpoint-list
14031407
TEST_F(SingleThreadedCheckpointTest,
1404-
CloseReplicaCheckpointOnMemorySnapshotEnd_Disk) {
1408+
DISABLED_CloseReplicaCheckpointOnMemorySnapshotEnd_Disk) {
14051409
closeReplicaCheckpointOnMemorySnapshotEnd(
14061410
false, dcp_marker_flag_t::MARKER_FLAG_DISK);
14071411
}

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,7 +1559,7 @@ void DurabilityPassiveStreamTest::
15591559
streamStartSeqno /*snapStart*/,
15601560
streamStartSeqno /*snapEnd*/,
15611561
dcp_marker_flag_t::MARKER_FLAG_DISK,
1562-
{} /*HCS*/,
1562+
0 /*HCS*/,
15631563
{} /*maxVisibleSeqno*/,
15641564
{} /*streamId*/);
15651565
stream->processMarker(&marker);
@@ -1677,7 +1677,7 @@ TEST_P(DurabilityPassiveStreamTest,
16771677
1 /*snapStart*/,
16781678
~1 /*snapEnd*/,
16791679
dcp_marker_flag_t::MARKER_FLAG_DISK,
1680-
{} /*HCS*/,
1680+
0 /*HCS*/,
16811681
{} /*maxVisibleSeqno*/,
16821682
{} /*streamId*/);
16831683
stream->processMarker(&marker);

0 commit comments

Comments
 (0)