Skip to content

Commit bfa0dd8

Browse files
committed
MB-50874: Reset snap start if less than lastSeqno on new checkpoint creation
+Problem+ If a replica vBucket is promoted to active, and the last DCP message it received was a Snapshot Marker which had the first mutation de-duplicated, then the snapshot start of the newly-promoted active ends up greater than the active. Upon the next Flusher run (i.e. next mutation to the vBucket), the Flusher throws an exception when trying to fetch items which terminates KV-Engine (as exception is thrown on BG thread): CheckpointManager::queueDirty: lastBySeqno not in snapshot range. vb:0 state:active snapshotStart:12 lastBySeqno:11 snapshotEnd:11 genSeqno:Yes checkpointList.size():2 +Details+ When streaming data from an Active to Replica vBucket, the extent of the Checkpoint is sent via DCP using a SnapshotMarker message, followed by N Mutation / Deletion messages. The snapshot marker may be discontinuous compared to the previous if any de-duplication occurred within the Checkpoint - for example if document "key" was written sufficient times in quick succession, one could end up with the following two Checkpoints on the active and subsequent DCP SnapshotMarker sent to the replica: CheckpointManager[0x108a03080] with numItems:6 checkpoints:2 Checkpoint[0x10891f000] with id:2 seqno:{1,10} snap:{0,10, visible:10} state:CHECKPOINT_CLOSED numCursors:1 type:Memory hcs:-- items:[ {10,mutation,cid:0x0: deduplicated_key,119,} {11,checkpoint_end,cid:0x1:checkpoint_end,119,[m]} ] Checkpoint[0x10891fa00] with id:3 seqno:{11,12} snap:{10,12, visible:12} state:CHECKPOINT_OPEN numCursors:1 type:Memory hcs:-- items:[ {11,checkpoint_start,cid:0x1:checkpoint_start,121,[m]} {12,mutation,cid:0x0:deduplicated_key,130,} ] Note how there are just two mutations remaining (at seqnos 10 and 12), and that there is a seqno "gap" at 11 (ignore meta-items which are not send over DCP). When this is replicated over DCP it will be sent as: * DCP_SNAPSHOT_MARKER(start:0, end:10, flags=CHK) * DCP_MUTATION(seqno:10, ...) * DCP_SNAPSHOT_MARKER(start:12, end:12, flags=CHK) * DCP_MUTATION(seqno:12, ...) Note that the second SnapshotMarker being flagged as "CHK" (Checkpoint) is essential - we need the replica to end up creating a new Checkpoint with the start and end controlled by the active - a SnapshotMarker without that flag is insufficient as it just extends the existing checkpoint, increasing the checkpoint end but leaving start unaffected. Once these messages are replicated over DCP the replica vBucket should have equivalent state as the active. However; if the last DCP_MUTATION is not received - for example if the active node is being failed over and the stream is closed before the DCP_MUTATION, then the state of the replica - crucially the Open checkpoint is as follows: Checkpoint[0x10cecde00] with id:2 seqno:{11,11} snap:{12,12, visible:12} state:CHECKPOINT_OPEN numCursors:0 type:Memory hcs:-- items:[ {11,checkpoint_start,cid:0x1:checkpoint_start,121,[m]} ] When this sequence occurs, the seqno range (11,11) in the open Checkpoint is less than the snapshot range (12,12). This is problematic as we have essentially broken an invariant on Checkpoints - that all items within them are between the snapshot start and end. This doesn't immediately cause a problem, but if this vBucket is converted to Active and starts accepting mutations itself, it will start generating seqnos from the last seqno received - 10 in this case. This results in the next mutation being assigned seqno 11, which when the flusher is woken and attempts to flush throws an exception on the BG thread and crashes KV-Engine. +Solution+ The cleanest way to solve this would be to ensure that the SnapshotMarker has a start equal to the start of the source Checkpoint - i.e. 11 instead of 12. That is indeed what has been done to address MB-50333 which is a SyncWrite variant of this issue. However that is a medium-sized change and affects the actual data sent over the wire, so more risky for a maitenance fix. Instead this patch takes a more targetted approach - when we create a new Checkpoint during the setvBucketState, we modify the start seqno if it is less than the lastSeqno. Change-Id: Icc6176a3634944800271be0d9d05949c63b29bf4 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/170268 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Ben Huddleston <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]>
1 parent c7ce4bf commit bfa0dd8

File tree

5 files changed

+106
-8
lines changed

5 files changed

+106
-8
lines changed

engines/ep/src/checkpoint_config.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class CheckpointConfig {
6262
return persistenceEnabled;
6363
}
6464

65+
static void addConfigChangeListener(EventuallyPersistentEngine& engine);
66+
6567
protected:
6668
friend class CheckpointConfigChangeListener;
6769
friend class EventuallyPersistentEngine;
@@ -82,8 +84,6 @@ class CheckpointConfig {
8284
keepClosedCheckpoints = value;
8385
}
8486

85-
static void addConfigChangeListener(EventuallyPersistentEngine& engine);
86-
8787
private:
8888
class ChangeListener;
8989

engines/ep/src/checkpoint_manager.cc

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,11 @@ size_t CheckpointManager::getNumOpenChkItems() const {
12391239
return getOpenCheckpoint_UNLOCKED(lh).getNumItems();
12401240
}
12411241

1242+
size_t CheckpointManager::getNumCheckpoints() const {
1243+
LockHolder lh(queueLock);
1244+
return checkpointList.size();
1245+
}
1246+
12421247
uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(const LockHolder& lh,
12431248
bool forceCreation,
12441249
bool timeBound) {
@@ -1456,13 +1461,30 @@ uint64_t CheckpointManager::createNewCheckpoint(bool force) {
14561461
LockHolder lh(queueLock);
14571462

14581463
const auto& openCkpt = getOpenCheckpoint_UNLOCKED(lh);
1464+
if (openCkpt.getNumItems() > 0 || force) {
1465+
addNewCheckpoint_UNLOCKED(openCkpt.getId() + 1);
1466+
}
14591467

1460-
if (openCkpt.getNumItems() == 0 && !force) {
1461-
return openCkpt.getId();
1468+
auto& openCkpt2 = getOpenCheckpoint_UNLOCKED(lh);
1469+
1470+
/* MB-50874: Ensure that the snapshot start of our newly-active
1471+
* checkpoint is not greater than CheckpointManager::lastBySeqno.
1472+
* Note in Neo this issue no longer occurs as the snap_start is sent
1473+
* correctly - see MB-50333.
1474+
*/
1475+
if (static_cast<uint64_t>(lastBySeqno) <
1476+
openCkpt2.getSnapshotStartSeqno()) {
1477+
EP_LOG_INFO(
1478+
"CheckpointManager::createNewCheckpoint(): {} Found "
1479+
"lastBySeqno:{} less than snapStart:{}, adjusting snapStart to lastBySeqno + 1",
1480+
vbucketId,
1481+
lastBySeqno,
1482+
openCkpt2.getSnapshotStartSeqno(),
1483+
lastBySeqno + 1);
1484+
openCkpt2.setSnapshotStartSeqno(lastBySeqno + 1);
14621485
}
14631486

1464-
addNewCheckpoint_UNLOCKED(openCkpt.getId() + 1);
1465-
return getOpenCheckpointId_UNLOCKED(lh);
1487+
return openCkpt2.getId();
14661488
}
14671489

14681490
uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
@@ -1682,4 +1704,4 @@ FlushHandle::~FlushHandle() {
16821704
}
16831705
// Flush-success path
16841706
manager.removeBackupPersistenceCursor();
1685-
}
1707+
}

engines/ep/src/checkpoint_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ class CheckpointManager {
331331
*/
332332
size_t getNumOpenChkItems() const;
333333

334+
/// @returns the number of Checkpoints this Manager has.
335+
size_t getNumCheckpoints() const;
336+
334337
/* WARNING! This method can return inaccurate counts - see MB-28431. It
335338
* at *least* can suffer from overcounting by at least 1 (in scenarios as
336339
* yet not clear).

engines/ep/tests/mock/mock_synchronous_ep_engine.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ SynchronousEPEngine::SynchronousEPEngine(std::string extra_config)
6767

6868
// checkpointConfig is needed by CheckpointManager (via EPStore).
6969
checkpointConfig = std::make_unique<CheckpointConfig>(*this);
70+
CheckpointConfig::addConfigChangeListener(*this);
7071

7172
dcpFlowControlManager_ = std::make_unique<DcpFlowControlManager>(*this);
7273

engines/ep/tests/module_tests/dcp_reflection_test.cc

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
152152

153153
void transferResponseMessage();
154154

155+
/// Inject a CloseStream message into the consumer side of the route.
156+
void closeStreamAtConsumer();
157+
155158
std::pair<ActiveStream*, MockPassiveStream*> getStreams();
156159

157160
Vbid vbid;
@@ -485,6 +488,10 @@ void DCPLoopbackStreamTest::DcpRoute::transferResponseMessage() {
485488
}
486489
}
487490

491+
void DCPLoopbackStreamTest::DcpRoute::closeStreamAtConsumer() {
492+
this->consumer->closeStream(0, vbid, {});
493+
}
494+
488495
std::pair<cb::engine_errc, uint64_t>
489496
DCPLoopbackStreamTest::DcpRoute::doStreamRequest(int flags) {
490497
// Do the add_stream
@@ -996,6 +1003,71 @@ TEST_F(DCPLoopbackStreamTest, MB_36948_SnapshotEndsOnPrepare) {
9961003
EXPECT_EQ(2, replicaVB->checkpointManager->getVisibleSnapshotEndSeqno());
9971004
}
9981005

1006+
/**
1007+
* Regression test for mB-50874 - a scenario where a replica:
1008+
* 1. receives a DCP snapshot marker which has the first seqno de-duplicated
1009+
* 2. DCP stream is closed (e.g. ns_server failing over the active)
1010+
* 3. vbucket is promoted to active
1011+
*
1012+
* This results in a Checkpoint where the snapshot start - updated from
1013+
* SnapshotMarker at (1) - is greater than the lastBySeqno and this ends
1014+
* up throwing an exception in the Flusher when we next persist anything.
1015+
*/
1016+
TEST_F(DCPLoopbackStreamTest, MB50874_DeDuplicatedMutationsReplicaToActive) {
1017+
// We need a new checkpoint (MARKER_FLAG_CHK set) when the active node
1018+
// generates markers - reduce chkMaxItems to the minimum to simplify this.
1019+
engines[Node0]->getConfiguration().setChkMaxItems(MIN_CHECKPOINT_ITEMS);
1020+
1021+
// Setup - fill up the initial checkpoint, with items, so when we
1022+
// queue the next mutations a new checkpoints is created.
1023+
for (int i = 0; i < MIN_CHECKPOINT_ITEMS; i++) {
1024+
auto key = makeStoredDocKey("key_" + std::to_string(i));
1025+
ASSERT_EQ(ENGINE_SUCCESS, storeSet(key));
1026+
}
1027+
auto srcVB = engines[Node0]->getVBucket(vbid);
1028+
ASSERT_EQ(1, srcVB->checkpointManager->getNumCheckpoints());
1029+
1030+
// Now modify one more key, which should create a new Checkpoint.
1031+
auto key = makeStoredDocKey("deduplicated_key");
1032+
ASSERT_EQ(ENGINE_SUCCESS, storeSet(key));
1033+
// ... and modify again so we de-duplicate and have a seqno gap.
1034+
ASSERT_EQ(ENGINE_SUCCESS, storeSet(key));
1035+
1036+
// Sanity check our state - should have a 2nd checkpoint now.
1037+
ASSERT_EQ(2, srcVB->checkpointManager->getNumCheckpoints());
1038+
1039+
// Create a DCP connection between node0 and 1, and stream the initial
1040+
// marker and the 10 mutations.
1041+
auto route0_1 = createDcpRoute(Node0, Node1);
1042+
ASSERT_EQ(cb::engine_errc::success, route0_1.doStreamRequest().first);
1043+
route0_1.transferSnapshotMarker(
1044+
0, 10, MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
1045+
for (int i = 0; i < MIN_CHECKPOINT_ITEMS; i++) {
1046+
route0_1.transferMessage(DcpResponse::Event::Mutation);
1047+
}
1048+
1049+
// Test - transfer the snapshot marker (but no mutations), then close stream
1050+
// and promote to active; and try to accept a new mutation.
1051+
route0_1.transferSnapshotMarker(
1052+
12, 12, MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
1053+
1054+
route0_1.closeStreamAtConsumer();
1055+
engines[Node1]->getKVBucket()->setVBucketState(vbid, vbucket_state_active);
1056+
1057+
// Prior to the fix, this check fails.
1058+
auto& dstCkptMgr = *engines[Node1]->getVBucket(vbid)->checkpointManager;
1059+
EXPECT_LE(dstCkptMgr.getOpenSnapshotStartSeqno(),
1060+
dstCkptMgr.getHighSeqno() + 1)
1061+
<< "Checkpoint start should be less than or equal to next seqno to "
1062+
"be assigned (highSeqno + 1)";
1063+
1064+
// Prior to the fix, this throws std::logic_error from
1065+
// CheckpointManager::queueDirty as lastBySeqno is outside snapshot range.
1066+
EXPECT_EQ(ENGINE_SUCCESS,
1067+
engines[Node1]->getKVBucket()->set(
1068+
*makeCommittedItem(key, "value"), cookie));
1069+
}
1070+
9991071
TEST_F(DCPLoopbackStreamTest, MB_41255_dcp_delete_evicted_xattr) {
10001072
auto k1 = makeStoredDocKey("k1");
10011073
EXPECT_EQ(ENGINE_SUCCESS, storeSet(k1, true /*xattr*/));
@@ -1183,4 +1255,4 @@ TEST_P(DCPLoopbackSnapshots, testSnapshots) {
11831255

11841256
INSTANTIATE_TEST_CASE_P(DCPLoopbackSnapshot,
11851257
DCPLoopbackSnapshots,
1186-
::testing::Range(1, 10), );
1258+
::testing::Range(1, 10), );

0 commit comments

Comments
 (0)