Skip to content

Commit 50fa82a

Browse files
committed
MB-35908: Compare cas instead of seqno when deduping setVBState
The flusher currently compares the seqnos of set_vbucket_state items to determine which is the most recent vbucket state (i.e. the one that should be persisted). This is an issue as set_vbucket_state operations do not update the lastBySeqno of the vBucket. This means that subsequent set_vbucket_state items may have the same seqno. It's not valid to udpate the lastBySeqno of the vBucket as the replica seqnos will be different, consider setting the failover table on DCP stream acceptance, and DCP Consumers may expect to see a seqno that will never be sent. Compare the CAS of the set_vbucket_state items to determine which is the most recent instead of the seqno as the CAS will be unique. Change-Id: Id17e03b635710c04e195e3ff77e407a065e112cd Reviewed-on: http://review.couchbase.org/114822 Reviewed-by: Dave Rigby <[email protected]> Reviewed-by: Jim Walker <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 945ae4b commit 50fa82a

File tree

3 files changed

+64
-10
lines changed

3 files changed

+64
-10
lines changed

engines/ep/src/checkpoint_manager.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,12 @@ void CheckpointManager::queueSetVBState(VBucket& vb) {
787787
queued_item item = createCheckpointItem(/*id*/0, vbucketId,
788788
queue_op::set_vbucket_state);
789789

790+
// We need to set the cas of the item as two subsequent set_vbucket_state
791+
// items will have the same seqno and the flusher needs a way to determine
792+
// which is the latest so that we persist the correct state.
793+
// We do this 'atomically' as we are holding the ::queueLock.
794+
item->setCas(vb.nextHLCCas());
795+
790796
// Store a JSON version of the vbucket transition data in the value
791797
vbstate.toItem(*item);
792798

@@ -1184,6 +1190,14 @@ queued_item CheckpointManager::createCheckpointItem(uint64_t id,
11841190
bySeqno = lastBySeqno;
11851191
break;
11861192
case queue_op::set_vbucket_state:
1193+
// It's not valid to actually increment lastBySeqno for a
1194+
// set_vbucket_state for two reasons:
1195+
// 1) This may be called independently on the replica to the active
1196+
// (i.e. for a failover table change) so the seqnos would differ to
1197+
// those on the active.
1198+
// 2) DcpConsumer calling getAllVBucketSeqnos would expect to see a
1199+
// seqno that will never be sent to them if the last item queued is a
1200+
// set_vbucket_state.
11871201
bySeqno = lastBySeqno + 1;
11881202
break;
11891203

engines/ep/src/ep_bucket.cc

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
397397
}
398398
// We need to set a few values from the in-memory state.
399399
uint64_t maxSeqno = 0;
400-
uint64_t maxVbStateOperation = 0;
400+
uint64_t maxVbStateOpCas = 0;
401401

402402
auto minSeqno = std::numeric_limits<uint64_t>::max();
403403

@@ -460,21 +460,31 @@ std::pair<bool, size_t> EPBucket::flushVBucket(Vbid vbid) {
460460
}
461461

462462
if (op == queue_op::set_vbucket_state) {
463-
// Only process vbstate if it's sequenced higher
464-
if (static_cast<uint64_t>(item->getBySeqno()) >
465-
maxVbStateOperation) {
466-
maxVbStateOperation = item->getBySeqno();
463+
// Only process vbstate if it's sequenced higher (by cas).
464+
// We use the cas instead of the seqno here because a
465+
// set_vbucket_state does not increment the lastBySeqno in
466+
// the CheckpointManager when it is created. This means that
467+
// it is possible to have two set_vbucket_state items that
468+
// follow one another with the same seqno. The cas will be
469+
// bumped for every item so it can be used to distinguish
470+
// which item is the latest and should be flushed.
471+
if (item->getCas() > maxVbStateOpCas) {
472+
// Should only bump the stat once for the latest state
473+
// change that we want to flush
474+
if (maxVbStateOpCas == 0) {
475+
// There is at least a commit to be done, so
476+
// increase todo
477+
++stats.flusher_todo;
478+
}
479+
480+
maxVbStateOpCas = item->getCas();
467481

468482
// It could be the case that the set_vbucket_state is
469483
// alone, i.e. no mutations are being flushed, we must
470-
// trigger an update of the vbstatem which will always
484+
// trigger an update of the vbstate, which will always
471485
// happen when we set this.
472486
mustCheckpointVBState = true;
473487

474-
// There is at least a commit to be done, so increase
475-
// todo
476-
++stats.flusher_todo;
477-
478488
// Process the Item's value into the transition struct
479489
vbstate.transition.fromItem(*item);
480490
}

engines/ep/tests/module_tests/evp_store_warmup_test.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "ep_time.h"
2727
#include "evp_store_durability_test.h"
2828
#include "evp_store_single_threaded_test.h"
29+
#include "failover-table.h"
2930
#include "kvstore.h"
3031
#include "programs/engine_testapp/mock_server.h"
3132
#include "test_helpers.h"
@@ -551,6 +552,35 @@ TEST_F(WarmupTest, SetVBState) {
551552
EXPECT_EQ(vbucket_state_replica, store->getVBucket(vbid)->getState());
552553
}
553554

555+
TEST_F(WarmupTest, TwoStateChangesAtSameSeqno) {
556+
// 1) Do a normal state change to replica
557+
EXPECT_EQ(ENGINE_SUCCESS,
558+
store->setVBucketState(vbid, vbucket_state_replica));
559+
560+
// 2) Make a change to the failover table as we would if we had just created
561+
// a stream to an active node
562+
auto failoverTableLastSeqno = 999;
563+
{ // VBPtr scope (not valid after warmup)
564+
auto vb = store->getVBucket(vbid);
565+
vb->failovers->createEntry(failoverTableLastSeqno);
566+
EXPECT_EQ(2, vb->failovers->getNumEntries());
567+
EXPECT_EQ(failoverTableLastSeqno,
568+
vb->failovers->getLatestEntry().by_seqno);
569+
}
570+
store->scheduleVBStatePersist(vbid);
571+
572+
// 3) Flush the two state changes together and warmup so that we can more
573+
// easily read what was persisted
574+
flush_vbucket_to_disk(vbid, 0);
575+
resetEngineAndWarmup();
576+
577+
// Test
578+
auto vb = store->getVBucket(vbid);
579+
auto state = store->getVBucket(vbid)->getTransitionState();
580+
EXPECT_EQ(vbucket_state_replica, vb->getState());
581+
EXPECT_EQ(failoverTableLastSeqno, vb->failovers->getLatestEntry().by_seqno);
582+
}
583+
554584
// Test uses a sequence of operations that will mean delete is de-duplicated
555585
// resulting in the flusher missing the update to the on-disk max-deleted
556586
// revision.

0 commit comments

Comments
 (0)