@@ -1380,10 +1380,99 @@ void DurabilityPassiveStreamTest::testReceiveDuplicateDcpPrepare(
13801380 opaque, vbid, prepareSeqno, commitSeqno, key)));
13811381}
13821382
1383+ void DurabilityPassiveStreamTest::testReceiveMultipleDuplicateDcpPrepares () {
1384+ // This simulates the state in which the active has:
1385+ // PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1386+ // the replica sees:
1387+ // PRE1 PRE2 PRE3 ||Disconnect|| PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1388+ // All 3 duplicate prepares should be accepted by
1389+ // allowedDuplicatePrepareSeqnos
1390+ const uint64_t cas = 999 ;
1391+ uint64_t seqno = 1 ;
1392+ std::vector<StoredDocKey> keys = {makeStoredDocKey (" key1" ),
1393+ makeStoredDocKey (" key2" ),
1394+ makeStoredDocKey (" key3" )};
1395+
1396+ // Do first prepare for each of three keys
1397+ // PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1398+ // ^^^^ ^^^^ ^^^^
1399+ std::vector<queued_item> queued_items;
1400+ for (const auto & key : keys) {
1401+ queued_items.push_back (makeAndReceiveDcpPrepare (key, cas, seqno++));
1402+ }
1403+
1404+ // The consumer now "disconnects" then "re-connects" and misses the commits
1405+ // at seqnos 4, 5, 6.
1406+ // PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1407+ // xxxx xxxx xxxx
1408+ // It instead receives the following snapshot [7, 9] containing prepares
1409+ // (for the same 3 keys), followed by a second snapshot [10, 12] with the
1410+ // corresponding commits.
1411+ uint32_t opaque = 0 ;
1412+
1413+ // Fake disconnect and reconnect, importantly, this sets up the valid window
1414+ // for replacing the old prepare.
1415+ consumer->closeAllStreams ();
1416+ consumer->addStream (opaque, vbid, 0 /* flags*/ );
1417+ stream = static_cast <MockPassiveStream*>(
1418+ (consumer->getVbucketStream (vbid)).get ());
1419+ stream->acceptStream (cb::mcbp::Status::Success, opaque);
1420+
1421+ ASSERT_TRUE (stream->isActive ());
1422+ // At Replica we don't expect multiple Durability items (for the same key)
1423+ // within the same snapshot. That is because the Active prevents that for
1424+ // avoiding de-duplication.
1425+ // So, we need to simulate a Producer sending another SnapshotMarker with
1426+ // the MARKER_FLAG_CHK set before the Consumer receives the Commit. That
1427+ // will force the Consumer closing the open checkpoint (which Contains the
1428+ // Prepare) and creating a new open one for queueing the Commit.
1429+ SnapshotMarker marker (
1430+ opaque,
1431+ vbid,
1432+ 7 /* snapStart*/ ,
1433+ 9 /* snapEnd*/ ,
1434+ dcp_marker_flag_t ::MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
1435+ {} /* streamId*/ );
1436+ stream->processMarker (&marker);
1437+
1438+ // Do second prepare for each of three keys
1439+ // PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1440+ // ^^^^ ^^^^ ^^^^
1441+ seqno = 7 ;
1442+ for (const auto & key : keys) {
1443+ queued_items.push_back (makeAndReceiveDcpPrepare (key, cas, seqno++));
1444+ }
1445+
1446+ marker = SnapshotMarker (
1447+ opaque,
1448+ vbid,
1449+ 10 /* snapStart*/ ,
1450+ 12 /* snapEnd*/ ,
1451+ dcp_marker_flag_t ::MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
1452+ {} /* streamId*/ );
1453+ stream->processMarker (&marker);
1454+
1455+ // Commit each of the keys
1456+ // PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
1457+ // ^^^^ ^^^^ ^^^^
1458+
1459+ uint64_t prepareSeqno = 7 ;
1460+ seqno = 10 ;
1461+ for (const auto & key : keys) {
1462+ ASSERT_EQ (ENGINE_SUCCESS,
1463+ stream->messageReceived (std::make_unique<CommitSyncWrite>(
1464+ opaque, vbid, prepareSeqno++, seqno++, key)));
1465+ }
1466+ }
1467+
13831468TEST_P (DurabilityPassiveStreamTest, ReceiveDuplicateDcpPrepare) {
13841469 testReceiveDuplicateDcpPrepare (3 );
13851470}
13861471
1472+ TEST_P (DurabilityPassiveStreamTest, ReceiveMultipleDuplicateDcpPrepares) {
1473+ testReceiveMultipleDuplicateDcpPrepares ();
1474+ }
1475+
13871476TEST_P (DurabilityPassiveStreamTest, ReceiveDuplicateDcpPrepareRemoveFromSet) {
13881477 testReceiveDuplicateDcpPrepare (3 );
13891478
0 commit comments