@@ -4430,10 +4430,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, FlowControlUnackedDcpCommit) {
44304430 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
44314431}
44324432
4433- // @todo MB-31869: review
4434- TEST_P (DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
4435- auto key = makeStoredDocKey (" bufferDcp" );
4436-
4433+ TEST_P (DurabilityPassiveStreamPersistentTest, FlowControlUnackedDcpAbort) {
44374434 // Messages go into the consumer so we update flow-control
44384435 EXPECT_EQ (cb::engine_errc::success,
44394436 consumer->snapshotMarker (stream->getOpaque (),
@@ -4449,6 +4446,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
44494446 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2xPayload) +
44504447 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2_0Value),
44514448 ackBytes);
4449+ auto key = makeStoredDocKey (" bufferDcp" );
44524450 EXPECT_EQ (cb::engine_errc::success,
44534451 consumer->prepare (stream->getOpaque (),
44544452 key,
@@ -4470,7 +4468,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
44704468 sizeof (cb::mcbp::request::DcpPreparePayload) + key.size () + 2 ;
44714469 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
44724470
4473- // Force consumer to buffer
4471+ // Force consumer to backoff on acking bytes
44744472 auto & config = engine->getConfiguration ();
44754473 config.setMutationMemRatio (0.0 );
44764474 auto & stats = engine->getEpStats ();
@@ -4479,7 +4477,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
44794477 ASSERT_EQ (KVBucket::ReplicationThrottleStatus::Pause,
44804478 engine->getKVBucket ()->getReplicationThrottleStatus ());
44814479
4482- // Now buffer abort
4480+ // Receive the Abort
44834481 EXPECT_EQ (cb::engine_errc::success,
44844482 consumer->snapshotMarker (stream->getOpaque (),
44854483 vbid,
@@ -4489,7 +4487,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
44894487 0 ,
44904488 0 ));
44914489
4492- // No change, snapshot is now buffered
4490+ // No change, marker hasn't been acked
44934491 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
44944492
44954493 EXPECT_EQ (cb::engine_errc::success,
@@ -4499,18 +4497,18 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
44994497 1 /* prepare*/ ,
45004498 2 /* abort*/ ));
45014499
4502- // No change, commit is now buffered
4500+ // No change, abort hasn't been acked
45034501 EXPECT_EQ (ackBytes, consumer->getFlowControl ().getFreedBytes ());
45044502
4505- // undo the adjustments so that processing of buffered items will work
4506- config.setMutationMemRatio (0.99 );
4503+ // Recover from OOM
4504+ config.setMutationMemRatio (1 );
45074505 engine->getEpStats ().setMaxDataSize (size);
45084506
4509- // And process buffered items
4507+ // And process unacked bytes
45104508 EXPECT_EQ (more_to_process, consumer->processUnackedBytes ());
45114509 EXPECT_EQ (all_processed, consumer->processUnackedBytes ());
45124510
4513- // Snapshot and commit processed
4511+ // Marker and Commit bytes acked
45144512 ackBytes += sizeof (protocol_binary_request_header) +
45154513 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2xPayload) +
45164514 sizeof (cb::mcbp::request::DcpSnapshotMarkerV2_0Value);
0 commit comments