@@ -4344,10 +4344,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest,
43444344 cb::durability::Level::PersistToMajority);
43454345}
43464346
4347- // @todo MB-31869: review
4348- TEST_P (DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
4349- auto key = makeStoredDocKey (" bufferDcp" );
4350-
4347+ TEST_P (DurabilityPassiveStreamPersistentTest, FlowControlUnackedDcpCommit) {
43514348 // Messages go into the consumer so we update flow-control
43524349 EXPECT_EQ (cb::engine_errc::success,
43534350 consumer->snapshotMarker (stream->getOpaque (),
@@ -4363,6 +4360,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
43634360 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2xPayload) +
43644361 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2_0Value),
43654362 ackBytes);
4363+ auto key = makeStoredDocKey (" bufferDcp" );
43664364 EXPECT_EQ (cb::engine_errc::success,
43674365 consumer->prepare (stream->getOpaque (),
43684366 key,
@@ -4384,7 +4382,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
43844382 sizeof (cb::mcbp::request::DcpPreparePayload) + key.size () + 2 ;
43854383 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
43864384
4387- // Force consumer to buffer
4385+ // Force consumer to backoff on acking bytes
43884386 auto & config = engine->getConfiguration ();
43894387 config.setMutationMemRatio (0.0 );
43904388 auto & stats = engine->getEpStats ();
@@ -4393,7 +4391,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
43934391 ASSERT_EQ (KVBucket::ReplicationThrottleStatus::Pause,
43944392 engine->getKVBucket ()->getReplicationThrottleStatus ());
43954393
4396- // Now buffer commit
4394+ // Now receive the commit
43974395 EXPECT_EQ (cb::engine_errc::success,
43984396 consumer->snapshotMarker (stream->getOpaque (),
43994397 vbid,
@@ -4402,8 +4400,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
44024400 MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
44034401 0 ,
44044402 0 ));
4405-
4406- // No change, snapshot is now buffered
4403+ // No change, marker hasn't been acked
44074404 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
44084405
44094406 EXPECT_EQ (cb::engine_errc::success,
@@ -4413,18 +4410,18 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpCommit) {
44134410 1 /* prepare*/ ,
44144411 2 /* commit*/ ));
44154412
4416- // No change, commit is now buffered
4413+ // No change, commit hasn't been acked
44174414 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
44184415
4419- // undo the adjustments so that processing of buffered items will work
4420- config.setMutationMemRatio (0.99 );
4416+ // Recover from OOM
4417+ config.setMutationMemRatio (1 );
44214418 engine->getEpStats ().setMaxDataSize (size);
44224419
4423- // And process buffered items
4420+ // And process the unacked bytes
44244421 EXPECT_EQ (more_to_process, consumer->processUnackedBytes ());
44254422 EXPECT_EQ (all_processed, consumer->processUnackedBytes ());
44264423
4427- // Snapshot and commit processed
4424+ // Marker and Commit acked
44284425 ackBytes += sizeof (protocol_binary_request_header) +
44294426 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2xPayload) +
44304427 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2_0Value);
0 commit comments