@@ -4083,6 +4083,113 @@ TEST_P(DurabilityPassiveStreamTest,
40834083 stream->processBufferedMessages (processedBytes, 1 ));
40844084}
40854085
4086+ void DurabilityPassiveStreamTest::testPrepareDeduplicationCorrectlyResetsHPS (
4087+ cb::durability::Level level) {
4088+ using namespace cb ::durability;
4089+ if (ephemeral ()) {
4090+ ASSERT_EQ (Level::Majority, level);
4091+ }
4092+
4093+ auto keyA = makeStoredDocKey (" keyA" );
4094+ makeAndReceiveDcpPrepare (keyA, 1 /* cas*/ , 1 /* prepareSeqno*/ , level);
4095+ auto keyB = makeStoredDocKey (" keyB" );
4096+ makeAndReceiveDcpPrepare (keyB, 2 /* cas*/ , 2 /* prepareSeqno*/ , level);
4097+ flushVBucketToDiskIfPersistent (vbid, 2 /* expected_num_flushed*/ );
4098+
4099+ // Expected state in PDM:
4100+ //
4101+ // P(keyA):1 P(keyB):2
4102+ // ^
4103+ // HPS:2
4104+ const auto & vb = *store->getVBucket (vbid);
4105+ const auto & pdm = dynamic_cast <const PassiveDurabilityMonitor&>(
4106+ vb.getDurabilityMonitor ());
4107+ EXPECT_EQ (2 , pdm.getNumTracked ());
4108+ EXPECT_EQ (2 , pdm.getHighestTrackedSeqno ());
4109+ EXPECT_EQ (2 , pdm.getHighPreparedSeqno ());
4110+
4111+ // Replica receives a disk snapshot now, key duplicates legal in PDM
4112+ const uint32_t opaque = 0 ;
4113+ SnapshotMarker marker (opaque,
4114+ vbid,
4115+ 3 /* snapStart*/ ,
4116+ 3 /* snapEnd*/ ,
4117+ MARKER_FLAG_DISK | MARKER_FLAG_CHK /* flags*/ ,
4118+ 0 /* HCS*/ ,
4119+ {} /* maxVisibleSeqno*/ ,
4120+ {} /* streamId*/ );
4121+ stream->processMarker (&marker);
4122+
4123+ // Before the fix, the next steps will end up with the following invalid
4124+ // state, which is a pre-requirement for the next steps to fail:
4125+ //
4126+ // P(keyA):1 x P(keyB):3
4127+ // ^
4128+ // HPS:2
4129+ //
4130+ // At fix, this is the state:
4131+ //
4132+ // P(keyA):1 x P(keyB):3
4133+ // ^
4134+ // HPS:2
4135+
4136+ auto qi = makePendingItem (keyB, " value" , Requirements (level, Timeout (60 )));
4137+ qi->setBySeqno (3 );
4138+ qi->setCas (3 );
4139+ // Receiving the snap-end seqno. Before the fix, this is where we fail on
4140+ // Ephemeral, as we callback into the PDM for updating the HPS from
4141+ // PassiveStream::handleSnapshotEnd().
4142+ // The PDM throws as we break the monotonicity invariant on HPS (set to
4143+ // seqno:2) by trying to reset it to PDM::trackedWrites::begin (ie, seqno:1)
4144+ EXPECT_EQ (ENGINE_SUCCESS,
4145+ stream->messageReceived (std::make_unique<MutationConsumerMessage>(
4146+ qi,
4147+ opaque,
4148+ IncludeValue::Yes,
4149+ IncludeXattrs::Yes,
4150+ IncludeDeleteTime::No,
4151+ IncludeDeletedUserXattrs::Yes,
4152+ DocKeyEncodesCollectionId::No,
4153+ nullptr ,
4154+ cb::mcbp::DcpStreamId{})));
4155+
4156+ if (persistent ()) {
4157+ EXPECT_EQ (2 , pdm.getNumTracked ());
4158+ EXPECT_EQ (3 , pdm.getHighestTrackedSeqno ());
4159+ EXPECT_EQ (2 , pdm.getHighPreparedSeqno ());
4160+
4161+ // At flush we persist the full disk snapshot and we call back into the
4162+ // PDM for moving the HPS. Before the fix the PDM throws here the same
4163+ // as already described for Ephemeral.
4164+ flush_vbucket_to_disk (vbid, 1 /* expected_num_flushed*/ );
4165+ }
4166+
4167+ // Final state at fix
4168+ //
4169+ // P(keyA):1 x P(keyB):3
4170+ // ^
4171+ // HPS:3
4172+ EXPECT_EQ (2 , pdm.getNumTracked ());
4173+ EXPECT_EQ (3 , pdm.getHighestTrackedSeqno ());
4174+ EXPECT_EQ (3 , pdm.getHighPreparedSeqno ());
4175+ }
4176+
4177+ TEST_P (DurabilityPassiveStreamTest, PrepareDedupCorrectlyResetsHPS_Majority) {
4178+ testPrepareDeduplicationCorrectlyResetsHPS (cb::durability::Level::Majority);
4179+ }
4180+
4181+ TEST_P (DurabilityPassiveStreamPersistentTest,
4182+ PrepareDedupCorrectlyResetsHPS_MajorityAndPersistOnMaster) {
4183+ testPrepareDeduplicationCorrectlyResetsHPS (
4184+ cb::durability::Level::MajorityAndPersistOnMaster);
4185+ }
4186+
4187+ TEST_P (DurabilityPassiveStreamPersistentTest,
4188+ PrepareDedupCorrectlyResetsHPS_PersistToMajority) {
4189+ testPrepareDeduplicationCorrectlyResetsHPS (
4190+ cb::durability::Level::PersistToMajority);
4191+ }
4192+
40864193void DurabilityPromotionStreamTest::SetUp () {
40874194 // Set up as a replica
40884195 DurabilityPassiveStreamTest::SetUp ();
0 commit comments