@@ -94,6 +94,15 @@ class STDcpTest : public STParameterizedBucketTest {
9494 * threshold or just below it
9595 */
9696 void processConsumerMutationsNearThreshold (bool beyondThreshold);
97+
98+ /* *
99+ * Creates a consumer conn and sends items on the conn with memory usage
100+ * near to replication threshold
101+ *
102+ * @param beyondThreshold indicates if the memory usage should above the
103+ * threshold or just below it
104+ */
105+ void sendConsumerMutationsNearThreshold (bool beyondThreshold);
97106};
98107
99108/*
@@ -696,6 +705,148 @@ TEST_P(STDcpTest, test_producer_no_stream_end_on_client_close_stream) {
696705 connMap.manageConnections ();
697706}
698707
708+ void STDcpTest::sendConsumerMutationsNearThreshold (bool beyondThreshold) {
709+ setVBucketStateAndRunPersistTask (vbid, vbucket_state_replica);
710+
711+ const void * cookie = create_mock_cookie (engine.get ());
712+ const uint32_t opaque = 1 ;
713+ const uint64_t snapStart = 1 ;
714+ const uint64_t snapEnd = std::numeric_limits<uint64_t >::max ();
715+ uint64_t bySeqno = snapStart;
716+
717+ /* Set up a consumer connection */
718+ auto & connMap = static_cast <MockDcpConnMap&>(engine->getDcpConnMap ());
719+ auto consumer =
720+ std::make_shared<MockDcpConsumer>(*engine, cookie, " test_consumer" );
721+ connMap.addConn (cookie, consumer);
722+
723+ /* Passive stream */
724+ ASSERT_EQ (ENGINE_SUCCESS,
725+ consumer->addStream (/* opaque*/ 0 ,
726+ vbid,
727+ /* flags*/ 0 ));
728+ MockPassiveStream* stream = static_cast <MockPassiveStream*>(
729+ (consumer->getVbucketStream (vbid)).get ());
730+ ASSERT_TRUE (stream->isActive ());
731+
732+ /* Send a snapshotMarker before sending items for replication */
733+ EXPECT_EQ (ENGINE_SUCCESS,
734+ consumer->snapshotMarker (opaque,
735+ vbid,
736+ snapStart,
737+ snapEnd,
738+ /* in-memory snapshot */ 0x1 ,
739+ {} /* HCS*/ ,
740+ {} /* maxVisibleSeq*/ ));
741+
742+ /* Send an item for replication */
743+ const DocKey docKey{nullptr , 0 , DocKeyEncodesCollectionId::No};
744+ EXPECT_EQ (ENGINE_SUCCESS,
745+ consumer->mutation (opaque,
746+ docKey,
747+ {}, // value
748+ 0 , // priv bytes
749+ PROTOCOL_BINARY_RAW_BYTES,
750+ 0 , // cas
751+ vbid,
752+ 0 , // flags
753+ bySeqno,
754+ 0 , // rev seqno
755+ 0 , // exptime
756+ 0 , // locktime
757+ {}, // meta
758+ 0 )); // nru
759+
760+ /* Set 'mem_used' beyond the 'replication threshold' */
761+ EPStats& stats = engine->getEpStats ();
762+ if (beyondThreshold) {
763+ engine->setMaxDataSize (stats.getPreciseTotalMemoryUsed ());
764+ } else {
765+ /* Set 'mem_used' just 1 byte less than the 'replication threshold'.
766+ That is we are below 'replication threshold', but not enough space
767+ for the new item */
768+ engine->setMaxDataSize (stats.getPreciseTotalMemoryUsed () + 1 );
769+ /* Simpler to set the replication threshold to 1 and test, rather than
770+ testing with maxData = (memUsed / replicationThrottleThreshold);
771+ that is, we are avoiding a division */
772+ engine->getConfiguration ().setReplicationThrottleThreshold (100 );
773+ }
774+
775+ if ((engine->getConfiguration ().getBucketType () == " ephemeral" ) &&
776+ (engine->getConfiguration ().getEphemeralFullPolicy ()) ==
777+ " fail_new_data" ) {
778+ /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
779+ while (true ) {
780+ /* Keep sending items till the memory usage goes above the
781+ threshold and the connection is disconnected */
782+ if (ENGINE_DISCONNECT ==
783+ consumer->mutation (opaque,
784+ docKey,
785+ {}, // value
786+ 0 , // priv bytes
787+ PROTOCOL_BINARY_RAW_BYTES,
788+ 0 , // cas
789+ vbid,
790+ 0 , // flags
791+ ++bySeqno,
792+ 0 , // rev seqno
793+ 0 , // exptime
794+ 0 , // locktime
795+ {}, // meta
796+ 0 )) {
797+ break ;
798+ }
799+ }
800+ } else {
801+ /* In 'couchbase' buckets we buffer the replica items and indirectly
802+ throttle replication by not sending flow control acks to the
803+ producer. Hence we do not drop the connection here */
804+ EXPECT_EQ (ENGINE_SUCCESS,
805+ consumer->mutation (opaque,
806+ docKey,
807+ {}, // value
808+ 0 , // priv bytes
809+ PROTOCOL_BINARY_RAW_BYTES,
810+ 0 , // cas
811+ vbid,
812+ 0 , // flags
813+ bySeqno + 1 ,
814+ 0 , // rev seqno
815+ 0 , // exptime
816+ 0 , // locktime
817+ {}, // meta
818+ 0 )); // nru
819+ }
820+
821+ /* Close stream before deleting the connection */
822+ EXPECT_EQ (ENGINE_SUCCESS, consumer->closeStream (opaque, vbid));
823+
824+ connMap.disconnect (cookie);
825+ EXPECT_FALSE (connMap.isDeadConnectionsEmpty ());
826+ connMap.manageConnections ();
827+ EXPECT_TRUE (connMap.isDeadConnectionsEmpty ());
828+ }
829+
830+ /* Here we test how the DCP consumer handles the scenario where the memory
831+ usage is beyond the replication throttle threshold.
832+ In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
833+ indicate close of the consumer conn and in other cases it is expected to
834+ just defer processing. */
835+ TEST_P (STDcpTest, ReplicateAfterThrottleThreshold) {
836+ sendConsumerMutationsNearThreshold (true );
837+ }
838+
839+ /* Here we test how the DCP consumer handles the scenario where the memory
840+ usage is just below the replication throttle threshold, but will go over the
841+ threshold when it adds the new mutation from the processor buffer to the
842+ hashtable.
843+ In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
844+ indicate close of the consumer conn and in other cases it is expected to
845+ just defer processing. */
846+ TEST_P (STDcpTest, ReplicateJustBeforeThrottleThreshold) {
847+ sendConsumerMutationsNearThreshold (false );
848+ }
849+
699850INSTANTIATE_TEST_SUITE_P (PersistentAndEphemeral,
700851 STDcpTest,
701852 STParameterizedBucketTest::allConfigValues ());
0 commit comments