@@ -4650,6 +4650,96 @@ TEST_P(SingleThreadedPassiveStreamTest, GetSnapshotInfo) {
46504650 EXPECT_EQ (snapshot_range_t (2 , 2 ), snapInfo.range );
46514651}
46524652
4653+ // Note: At the moment this test's purpose is to show how outbound DCP behaves
4654+ // at replica.
4655+ // @todo MB-59288
4656+ TEST_P (SingleThreadedPassiveStreamTest, BackfillSnapshotFromPartialReplica) {
4657+ auto & vb = *store->getVBucket (vbid);
4658+ ASSERT_EQ (0 , vb.getHighSeqno ());
4659+ auto & manager = static_cast <MockCheckpointManager&>(*vb.checkpointManager );
4660+ ASSERT_EQ (1 , manager.getNumCheckpoints ());
4661+ ASSERT_EQ (2 , manager.getNumOpenChkItems ()); // cs, vbs
4662+ removeCheckpoint (vb);
4663+ ASSERT_EQ (0 , vb.getHighSeqno ());
4664+ ASSERT_EQ (1 , manager.getNumCheckpoints ());
4665+ ASSERT_EQ (1 , manager.getNumOpenChkItems ()); // cs
4666+ const auto & list = manager.getCheckpointList ();
4667+ ASSERT_FALSE (list.back ()->modifiedByExpel ());
4668+ // The stream isn't in any snapshot (no data received)
4669+ auto snapInfo = manager.getSnapshotInfo ();
4670+ EXPECT_EQ (0 , snapInfo.start );
4671+ EXPECT_EQ (snapshot_range_t (0 , 0 ), snapInfo.range );
4672+ // Same on disk
4673+ auto & underlying = *store->getRWUnderlying (vbid);
4674+ ASSERT_EQ (0 , underlying.getLastPersistedSeqno (vbid));
4675+ auto vbstate = underlying.getPersistedVBucketState (vbid);
4676+ ASSERT_EQ (0 , vbstate.state .lastSnapStart );
4677+ ASSERT_EQ (0 , vbstate.state .lastSnapEnd );
4678+ const uint64_t opaque = 1 ;
4679+ EXPECT_EQ (cb::engine_errc::success,
4680+ consumer->snapshotMarker (opaque,
4681+ vbid,
4682+ 1 , // start
4683+ 2 , // end
4684+ MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
4685+ {},
4686+ {}));
4687+ ASSERT_EQ (0 , underlying.getLastPersistedSeqno (vbid));
4688+ vbstate = underlying.getPersistedVBucketState (vbid);
4689+ ASSERT_EQ (0 , vbstate.state .lastSnapStart );
4690+ ASSERT_EQ (0 , vbstate.state .lastSnapEnd );
4691+ const auto key = makeStoredDocKey (" key" );
4692+ EXPECT_EQ (cb::engine_errc::success,
4693+ consumer->mutation (opaque,
4694+ key,
4695+ {},
4696+ 0 ,
4697+ 0 ,
4698+ 0 ,
4699+ vbid,
4700+ 0 ,
4701+ 1 , // seqno
4702+ 0 ,
4703+ 0 ,
4704+ 0 ,
4705+ {},
4706+ 0 ));
4707+ EXPECT_EQ (1 , vb.getHighSeqno ());
4708+ EXPECT_EQ (2 , manager.getNumOpenChkItems ()); // cs, mut
4709+ // The stream is in a partial snapshot here
4710+ snapInfo = manager.getSnapshotInfo ();
4711+ EXPECT_EQ (1 , snapInfo.start );
4712+ EXPECT_EQ (snapshot_range_t (1 , 2 ), snapInfo.range );
4713+ // Disk too
4714+ flushVBucketToDiskIfPersistent (vbid);
4715+ ASSERT_EQ (1 , underlying.getLastPersistedSeqno (vbid));
4716+ vbstate = underlying.getPersistedVBucketState (vbid);
4717+ ASSERT_EQ (0 , vbstate.state .lastSnapStart );
4718+ ASSERT_EQ (2 , vbstate.state .lastSnapEnd );
4719+ // Trigger an outbound backfill
4720+ removeCheckpoint (vb);
4721+ auto producer = std::make_shared<MockDcpProducer>(
4722+ *engine, cookie, " test_producer->test_consumer" , 0 , false );
4723+ auto activeStream = std::make_shared<MockActiveStream>(
4724+ engine.get (), producer, 0 , 0 , vb);
4725+ activeStream->setActive ();
4726+ ASSERT_TRUE (activeStream->isBackfilling ());
4727+ auto & readyQ = activeStream->public_readyQ ();
4728+ ASSERT_EQ (0 , readyQ.size ());
4729+ // Core test
4730+ // Backfill generates a [0, 1] complete snapshot even if on disk replica is
4731+ // in a partial [0, 2] snapshot.
4732+ auto & lpAuxioQ = *task_executor->getLpTaskQ ()[AUXIO_TASK_IDX];
4733+ runNextTask (lpAuxioQ); // init
4734+ ASSERT_EQ (1 , readyQ.size ());
4735+ auto resp = activeStream->next (*producer);
4736+ ASSERT_TRUE (resp);
4737+ EXPECT_EQ (DcpResponse::Event::SnapshotMarker, resp->getEvent ());
4738+ auto snapMarker = dynamic_cast <SnapshotMarker&>(*resp);
4739+ EXPECT_EQ (0 , snapMarker.getStartSeqno ());
4740+ EXPECT_EQ (1 , snapMarker.getEndSeqno ());
4741+ }
4742+
46534743/* *
46544744 * MB-38444: We fix an Ephemeral-only bug, but test covers Persistent bucket too
46554745 */
0 commit comments