Skip to content

Commit 13f946a

Browse files
committed
MB-35001: Consumer: Avoid duplicate PREs after disk/mem transition
Summary: When transitioning to the inMemory phase of an Active DCP stream, ensure that the initial in-memory Snapshot has the CKPT flag set, to avoid the replica potentially seeing duplicate keys in the same Checkpoint. Details: Consider the following scenario of items on disk and in memory (checkpoint manager): Disk: 1:PRE(a), 2:CMT(a), 3:SET(b) Memory: 3:CKPT_START 3:SET(b), 4:PRE(a), 5:SET(c) (items 1..2 were in a removed checkpoint and no longer in-memory.) An ep-engine replica attempting to stream all of this (0..infinity) will result in a backfill of items 1..3, with a checkpoint cursor being placed at seqno:4. Note this isn't the start of the Checkpoint (which is 3) and hence not pointing at a checkpoint_start item. As such when this is streamed over DCP (up to seqno:4) the consumer will see: SNAPSHOT_MARKER(start=1, end=3, flags=DISK|CKPT) 1:PRE(a) 2:CMT(a) 3:SET(b) SNAPSHOT_MARKER(start=4, end=5, flags=MEM) 4:PRE(a), [[[missing seqno 5]] If the consumer puts all of these mutations in the same Checkpoint, then it will result in duplicate PRE(a) items (which breaks Checkpoint invariant). Address this by adding the CKPT flag to the Snapshot marker when transitioning between disk and memory snapshots. Note there is still an oustanding issue with the calculation of the failover table branch point as a consequence of this change; however that is being deferred for a second MB - see MB-35003. Change-Id: Idfb3b676136df0a3a6af7e626d793b6e696d7dfe Reviewed-on: http://review.couchbase.org/111908 Reviewed-by: Jim Walker <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent e2bd0ad commit 13f946a

File tree

6 files changed

+333
-29
lines changed

6 files changed

+333
-29
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,11 +1616,23 @@ void ActiveStream::transitionState(StreamState newState) {
16161616
endStream(END_STREAM_OK);
16171617
notifyStreamReady();
16181618
} else {
1619+
// Starting a new in-memory snapshot which could contain duplicate
1620+
// keys compared to the previous backfill snapshot. Therefore set
1621+
// the Checkpoint flag on the next snapshot so the Consumer will
1622+
// know to create a new Checkpoint.
1623+
nextSnapshotIsCheckpoint = true;
16191624
nextCheckpointItem();
16201625
}
16211626
break;
16221627
case StreamState::TakeoverSend:
16231628
takeoverStart = ep_current_time();
1629+
1630+
// Starting a new in-memory (takeover) snapshot which could contain
1631+
// duplicate keys compared to the previous Backfill snapshot. Therefore
1632+
// set the Checkpoint flag on the next snapshot so the Consumer will
1633+
// know to create a new Checkpoint.
1634+
nextSnapshotIsCheckpoint = true;
1635+
16241636
if (!nextCheckpointItem()) {
16251637
notifyStreamReady(true);
16261638
}

engines/ep/src/dcp/response.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,41 @@ uint32_t MutationResponse::getMessageSize() const {
108108
return header + body;
109109
}
110110

111+
MutationConsumerMessage::MutationConsumerMessage(MutationResponse& response)
112+
: MutationResponse(response.getItem(),
113+
response.getOpaque(),
114+
response.getIncludeValue(),
115+
response.getIncludeXattrs(),
116+
response.getIncludeDeleteTime(),
117+
response.getDocKeyEncodesCollectionId(),
118+
response.getEnableExpiryOutput(),
119+
response.getStreamId()),
120+
emd(nullptr) {
121+
// If the item is actually a commit_sync_write which had to be transmitted
122+
// as a DCP_MUTATION (i.e. MutationConsumerResponse), we need
123+
// to recreate the Item as operation==mutation otherwise the Consumer cannot
124+
// handle it.
125+
if (item_->getOperation() == queue_op::commit_sync_write) {
126+
auto* mutation = new Item(item_->getKey(),
127+
item_->getFlags(),
128+
item_->getExptime(),
129+
item_->getValue()->getData(),
130+
item_->getValue()->valueSize(),
131+
item_->getDataType(),
132+
item_->getCas(),
133+
item_->getBySeqno(),
134+
item_->getVBucketId(),
135+
item_->getRevSeqno(),
136+
item_->getNRUValue(),
137+
item_->getNRUValue());
138+
// Using const_cast here as item_ is const; alternative would be a
139+
// complex initialiser_list with ternaries to move the object
140+
// constuction there. Given the object isn't created until after this
141+
// method returns, this seems a reasonable compromise.
142+
const_cast<queued_item&>(item_).reset(mutation);
143+
}
144+
}
145+
111146
uint32_t MutationConsumerMessage::getMessageSize() const {
112147
uint32_t body = MutationResponse::getMessageSize();
113148

engines/ep/src/dcp/response.h

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -358,19 +358,19 @@ class SnapshotMarker : public DcpResponse {
358358
flags_(flags) {
359359
}
360360

361-
Vbid getVBucket() {
361+
Vbid getVBucket() const {
362362
return vbucket_;
363363
}
364364

365-
uint64_t getStartSeqno() {
365+
uint64_t getStartSeqno() const {
366366
return start_seqno_;
367367
}
368368

369-
uint64_t getEndSeqno() {
369+
uint64_t getEndSeqno() const {
370370
return end_seqno_;
371371
}
372372

373-
uint32_t getFlags() {
373+
uint32_t getFlags() const {
374374
return flags_;
375375
}
376376

@@ -554,18 +554,13 @@ class MutationConsumerMessage : public MutationResponse {
554554
emd(e) {
555555
}
556556

557-
// Used in test code for wiring a producer/consumer directly
558-
MutationConsumerMessage(MutationResponse& response)
559-
: MutationResponse(response.getItem(),
560-
response.getOpaque(),
561-
response.getIncludeValue(),
562-
response.getIncludeXattrs(),
563-
response.getIncludeDeleteTime(),
564-
response.getDocKeyEncodesCollectionId(),
565-
response.getEnableExpiryOutput(),
566-
response.getStreamId()),
567-
emd(nullptr) {
568-
}
557+
/**
558+
* Used in test code for wiring a producer/consumer directly.
559+
* @param response MutationResponse (from Producer) to create message from.
560+
* Note this is non-const as the item needs to be modified for some
561+
* Event types.
562+
*/
563+
MutationConsumerMessage(MutationResponse& response);
569564

570565
/**
571566
* @return size of message on the wire

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ class MockDcpProducer : public DcpProducer {
8484
return enableExpiryOpcode;
8585
}
8686

87+
void setSyncReplication(bool value) {
88+
supportsSyncReplication = value;
89+
}
8790
/**
8891
* Create the ActiveStreamCheckpointProcessorTask and assign to
8992
* checkpointCreator->task

0 commit comments

Comments
 (0)