@@ -4740,6 +4740,132 @@ TEST_P(SingleThreadedPassiveStreamTest, BackfillSnapshotFromPartialReplica) {
47404740 EXPECT_EQ (1 , snapMarker.getEndSeqno ());
47414741}
47424742
4743+ // Note: At the moment this test's purpose is to show how outbound DCP behaves
4744+ // at replica.
4745+ // @todo MB-59288
4746+ TEST_P (SingleThreadedPassiveStreamTest, MemorySnapshotFromPartialReplica) {
4747+ auto & vb = *store->getVBucket (vbid);
4748+ ASSERT_EQ (0 , vb.getHighSeqno ());
4749+ auto & manager = static_cast <MockCheckpointManager&>(*vb.checkpointManager );
4750+ ASSERT_EQ (1 , manager.getNumCheckpoints ());
4751+ ASSERT_EQ (2 , manager.getNumOpenChkItems ()); // cs, vbs
4752+
4753+ removeCheckpoint (vb);
4754+ ASSERT_EQ (0 , vb.getHighSeqno ());
4755+ ASSERT_EQ (1 , manager.getNumCheckpoints ());
4756+ ASSERT_EQ (1 , manager.getNumOpenChkItems ()); // cs
4757+ const auto & list = manager.getCheckpointList ();
4758+ ASSERT_FALSE (list.back ()->modifiedByExpel ());
4759+
4760+ // The stream isn't in any snapshot (no data received)
4761+ auto snapInfo = manager.getSnapshotInfo ();
4762+ EXPECT_EQ (0 , snapInfo.start );
4763+ EXPECT_EQ (snapshot_range_t (0 , 0 ), snapInfo.range );
4764+
4765+ const uint64_t opaque = 1 ;
4766+ EXPECT_EQ (cb::engine_errc::success,
4767+ consumer->snapshotMarker (opaque,
4768+ vbid,
4769+ 1 , // start
4770+ 2 , // end
4771+ MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
4772+ {},
4773+ {}));
4774+
4775+ const auto key = makeStoredDocKey (" key" );
4776+ EXPECT_EQ (cb::engine_errc::success,
4777+ consumer->mutation (opaque,
4778+ key,
4779+ {},
4780+ 0 ,
4781+ 0 ,
4782+ 0 ,
4783+ vbid,
4784+ 0 ,
4785+ 1 , // seqno
4786+ 0 ,
4787+ 0 ,
4788+ 0 ,
4789+ {},
4790+ 0 ));
4791+
4792+ EXPECT_EQ (1 , vb.getHighSeqno ());
4793+ EXPECT_EQ (2 , manager.getNumOpenChkItems ()); // cs, mut
4794+
4795+ // The stream is in a partial snapshot here
4796+ snapInfo = manager.getSnapshotInfo ();
4797+ EXPECT_EQ (1 , snapInfo.start );
4798+ EXPECT_EQ (snapshot_range_t (1 , 2 ), snapInfo.range );
4799+
4800+ // Open an outbound stream from replica.
4801+ auto producer = std::make_shared<MockDcpProducer>(
4802+ *engine, cookie, " test_producer->test_consumer" , 0 , false );
4803+ producer->createCheckpointProcessorTask ();
4804+ producer->scheduleCheckpointProcessorTask ();
4805+ auto activeStream = std::make_shared<MockActiveStream>(
4806+ engine.get (), producer, 0 , 0 , vb);
4807+ activeStream->setActive ();
4808+ ASSERT_TRUE (activeStream->isInMemory ());
4809+ auto & readyQ = activeStream->public_readyQ ();
4810+ ASSERT_EQ (0 , readyQ.size ());
4811+
4812+ // Core test
4813+ // I would expect that checkpoint generates a [0, 2] partial snapshot, only
4814+ // seqno:1 in checkpoint.
4815+ activeStream->nextCheckpointItemTask ();
4816+ ASSERT_EQ (2 , readyQ.size ());
4817+ // marker
4818+ auto resp = activeStream->next (*producer);
4819+ ASSERT_TRUE (resp);
4820+ EXPECT_EQ (DcpResponse::Event::SnapshotMarker, resp->getEvent ());
4821+ auto * snapMarker = dynamic_cast <SnapshotMarker*>(resp.get ());
4822+ EXPECT_EQ (0 , snapMarker->getStartSeqno ());
4823+ EXPECT_EQ (1 , snapMarker->getEndSeqno ()); // @todo ??? I would expect 2
4824+ // seqno:1
4825+ resp = activeStream->next (*producer);
4826+ ASSERT_TRUE (resp);
4827+ EXPECT_EQ (DcpResponse::Event::Mutation, resp->getEvent ());
4828+ auto * mut = dynamic_cast <MutationResponse*>(resp.get ());
4829+ EXPECT_EQ (1 , mut->getBySeqno ());
4830+
4831+ // Now replica receives the snapEnd mutation
4832+ EXPECT_EQ (cb::engine_errc::success,
4833+ consumer->mutation (opaque,
4834+ key,
4835+ {},
4836+ 0 ,
4837+ 0 ,
4838+ 0 ,
4839+ vbid,
4840+ 0 ,
4841+ 2 , // seqno
4842+ 0 ,
4843+ 0 ,
4844+ 0 ,
4845+ {},
4846+ 0 ));
4847+
4848+ // Core test
4849+ // There's only seqno:2 remaining for the stream in checkpoint. I would
4850+ // expect that just that mutation is sent for completing the snapshot that
4851+ // we have partially sent to the peer.
4852+ activeStream->nextCheckpointItemTask ();
4853+ ASSERT_EQ (2 , readyQ.size ()); // @todo ??? I would expect 1
4854+ // marker @todo ??? I would expect no marker
4855+ resp = activeStream->next (*producer);
4856+ ASSERT_TRUE (resp);
4857+ EXPECT_EQ (DcpResponse::Event::SnapshotMarker, resp->getEvent ());
4858+ snapMarker = dynamic_cast <SnapshotMarker*>(resp.get ());
4859+ EXPECT_EQ (2 , snapMarker->getStartSeqno ());
4860+ EXPECT_EQ (2 , snapMarker->getEndSeqno ());
4861+ // seqno:2
4862+ resp = activeStream->next (*producer);
4863+ ASSERT_TRUE (resp);
4864+ EXPECT_EQ (DcpResponse::Event::Mutation, resp->getEvent ());
4865+ mut = dynamic_cast <MutationResponse*>(resp.get ());
4866+ EXPECT_EQ (2 , mut->getBySeqno ());
4867+ }
4868+
47434869/* *
47444870 * MB-38444: We fix an Ephemeral-only bug, but test covers Persistent bucket too
47454871 */
0 commit comments