3838using Node = int ;
3939static Node Node0 = 0 ;
4040static Node Node1 = 1 ;
41+ static Node Node2 = 2 ;
42+ static Node Node3 = 3 ;
4143
4244static TaskQueue* getLpAuxQ () {
4345 auto * task_executor =
@@ -87,7 +89,7 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
8789
8890 void createNode (Node node, vbucket_state_t vbState) {
8991 ASSERT_NE (Node0, node) << " Cannot re-create Node0" ;
90- ASSERT_LE (node, Node1 ) << " Out of bounds for Node" << node;
92+ ASSERT_LE (node, Node3 ) << " Out of bounds for Node" << node;
9193
9294 std::string config = config_string;
9395 if (config.size () > 0 ) {
@@ -317,9 +319,9 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
317319
318320 // engines is 'map' from Node to an engine pointer, currently Node0 is the
319321 // engine created by the parent class and Node1 are created by this
320- // class. Node1 is always created by SetUp.
321- // @todo: expand the storage so we can have more nodes
322- std::array<SynchronousEPEngine*, 2 > engines;
322+ // class. Node1 is always created by SetUp and additional nodes created on
323+ // demand
324+ std::array<SynchronousEPEngine*, 4 > engines;
323325
324326 // Owned pointers to the other engines, created on demand by tests
325327 std::vector<SynchronousEPEngineUniquePtr> extraEngines;
@@ -737,3 +739,122 @@ TEST_F(DCPLoopbackStreamTest, InMemoryAndBackfillDuplicatePrepares) {
737739
738740 flushNodeIfPersistent (Node1);
739741}
742+
743+ // This test is validating that a replica which recevies a partial disk snapshot
744+ // is rolled back to before that partial snapshot during failover. Prior to this
745+ // test it was not clear if DCP would incorrectly resume the replica from beyond
746+ // the partial snapshot start point, however the test proved that the way that
747+ // KV calculates a disk snapshot marker is what ensures the consumer's post
748+ // failover stream request to be rejected.
749+ TEST_F (DCPLoopbackStreamTest, MultiReplicaPartialSnapshot) {
750+ // The keys we will use
751+ auto k1 = makeStoredDocKey (" k1" );
752+ auto k2 = makeStoredDocKey (" k2" );
753+ auto k3 = makeStoredDocKey (" k3" );
754+ auto k4 = makeStoredDocKey (" k4" );
755+ auto k5 = makeStoredDocKey (" k5" );
756+
757+ // setup Node2 so we have two replicas
758+ createNode (Node2, vbucket_state_replica);
759+
760+ auto route0_1 = createDcpRoute (Node0, Node1);
761+ auto route0_2 = createDcpRoute (Node0, Node2);
762+ EXPECT_EQ (cb::engine_errc::success, route0_1.doStreamRequest ().first );
763+ EXPECT_EQ (cb::engine_errc::success, route0_2.doStreamRequest ().first );
764+
765+ // Setup the active, first move the active away from seqno 0 with a couple
766+ // of keys, we don't really care about these in this test
767+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k1));
768+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k2));
769+ flushVBucketToDiskIfPersistent (vbid, 2 );
770+ // These go everywhere...
771+ route0_1.transferSnapshotMarker (0 , 2 , MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
772+ route0_1.transferMutation (k1, 1 );
773+ route0_1.transferMutation (k2, 2 );
774+ route0_2.transferSnapshotMarker (0 , 2 , MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
775+ route0_2.transferMutation (k1, 1 );
776+ route0_2.transferMutation (k2, 2 );
777+
778+ // Now setup the interesting operations, and build the replicas as we go.
779+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k3));
780+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k4));
781+ flushVBucketToDiskIfPersistent (vbid, 2 );
782+
783+ // And replicate the snapshot to replica on Node1
784+ route0_1.transferSnapshotMarker (3 , 4 , MARKER_FLAG_MEMORY);
785+ route0_1.transferMutation (k3, 3 );
786+ route0_1.transferMutation (k4, 4 );
787+ flushNodeIfPersistent (Node1);
788+
789+ // Simulate disconnect of route0_2 Node0->Node2
790+ route0_2.destroy ();
791+
792+ auto vb = engines[Node0]->getVBucket (vbid);
793+ // Next snapshot, *important* k3 is set again and in a new checkpoint
794+ vb->checkpointManager ->createNewCheckpoint ();
795+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k5));
796+ EXPECT_EQ (ENGINE_SUCCESS, storeSet (k3));
797+ flushVBucketToDiskIfPersistent (vbid, 2 );
798+
799+ // And replicate a partial snapshot to the replica on Node1
800+ route0_1.transferSnapshotMarker (5 , 6 , MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
801+ route0_1.transferMutation (k5, 5 );
802+ // k3@6 doesn't transfer
803+ flushNodeIfPersistent (Node1);
804+
805+ // brute force... ensure in-memory is now purged so our new stream backfills
806+ vb->checkpointManager ->clear (vbucket_state_active);
807+
808+ // Now reconnect Node0/Node2
809+ auto route0_2_new = createDcpRoute (Node0, Node2);
810+ EXPECT_EQ (cb::engine_errc::success, route0_2_new.doStreamRequest ().first );
811+ runBackfill ();
812+
813+ // Now transfer the disk snapshot, again partial, leave the last key.
814+ // NOTE: This is the important snapshot which ensures our consumer later
815+ // stream-requests and rolls back to before this partial snapshot. What
816+ // is special about disk snapshots is the start-seqno is the stream-request
817+ // start seqno. Only disk snapshots would do that, in-memory snapshots
818+ // always set the marker.start to be the first seqno the ActiveStream pushes
819+ // to the readyQueue regardless of what the stream-request start-seqno was.
820+ route0_2_new.transferSnapshotMarker (
821+ 2 , 6 , MARKER_FLAG_DISK | MARKER_FLAG_CHK);
822+ route0_2_new.transferMutation (k4, 4 ); // transfer k4
823+ flushNodeIfPersistent (Node2);
824+ route0_2_new.transferMutation (k5, 5 ); // transfer k5
825+ flushNodeIfPersistent (Node2);
826+ // but not k3@6
827+
828+ // DISASTER. NODE0 dies...
829+ // DCP crashes
830+ route0_1.destroy ();
831+ route0_2_new.destroy ();
832+ // NODE1 promoted
833+ EXPECT_EQ (ENGINE_SUCCESS,
834+ engines[Node1]->getKVBucket ()->setVBucketState (
835+ vbid, vbucket_state_active));
836+
837+ flushNodeIfPersistent (Node1);
838+
839+ // New topology
840+ // NODE1 active -> NODE2, NODE3
841+ createNode (Node3, vbucket_state_replica); // bring node3 into the test
842+ auto route1_2 = createDcpRoute (Node1, Node2);
843+ auto route1_3 = createDcpRoute (Node1, Node3);
844+ auto rollback = route1_2.doStreamRequest ();
845+ EXPECT_EQ (cb::engine_errc::rollback, rollback.first );
846+
847+ // The existing replica which connects to Node1 has to go back to seqno:2
848+ // and must rebuild the partial snapshot again
849+ EXPECT_EQ (2 , rollback.second );
850+
851+ // The new node joins successfully and builds a replica from 0
852+ EXPECT_EQ (cb::engine_errc::success, route1_3.doStreamRequest ().first );
853+ route1_3.transferSnapshotMarker (0 , 4 , MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
854+ route1_3.transferMutation (k1, 1 );
855+ route1_3.transferMutation (k2, 2 );
856+ route1_3.transferMutation (k3, 3 );
857+ route1_3.transferMutation (k4, 4 );
858+ route1_3.transferSnapshotMarker (5 , 5 , MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
859+ route1_3.transferMutation (k5, 5 );
860+ }
0 commit comments