@@ -44,6 +44,15 @@ class STDcpTest : public STParameterizedBucketTest {
4444 */
4545 void testConsumerNegotiatesIncludeDeletedUserXattrs (
4646 IncludeDeletedUserXattrs producerState);
47+
48+ /* *
49+ * Creates a consumer conn and makes the consumer processor task run with
50+ * memory usage near to replication threshold
51+ *
52+ * @param beyondThreshold indicates if the memory usage should above the
53+ * threshold or just below it
54+ */
55+ void processConsumerMutationsNearThreshold (bool beyondThreshold);
4756};
4857
4958/*
@@ -412,6 +421,150 @@ TEST_P(STDcpTest, ConsumerNegotiatesDeletedUserXattrs_EnabledAtProducer) {
412421 IncludeDeletedUserXattrs::Yes);
413422}
414423
424+ void STDcpTest::processConsumerMutationsNearThreshold (bool beyondThreshold) {
425+ const void * cookie = create_mock_cookie (engine.get ());
426+ const uint32_t opaque = 1 ;
427+ const uint64_t snapStart = 1 , snapEnd = 10 ;
428+ const uint64_t bySeqno = snapStart;
429+
430+ /* Set up a consumer connection */
431+ auto & connMap = engine->getDcpConnMap ();
432+ auto & mockConnMap = static_cast <MockDcpConnMap&>(connMap);
433+ auto consumer =
434+ std::make_shared<MockDcpConsumer>(*engine, cookie, " test_consumer" );
435+ mockConnMap.addConn (cookie, consumer);
436+
437+ /* Replica vbucket */
438+ setVBucketStateAndRunPersistTask (vbid, vbucket_state_replica);
439+
440+ /* Passive stream */
441+ ASSERT_EQ (ENGINE_SUCCESS,
442+ consumer->addStream (/* opaque*/ 0 ,
443+ vbid,
444+ /* flags*/ 0 ));
445+ MockPassiveStream* stream = static_cast <MockPassiveStream*>(
446+ (consumer->getVbucketStream (vbid)).get ());
447+ ASSERT_TRUE (stream->isActive ());
448+
449+ /* Send a snapshotMarker before sending items for replication */
450+ EXPECT_EQ (ENGINE_SUCCESS,
451+ consumer->snapshotMarker (opaque,
452+ vbid,
453+ snapStart,
454+ snapEnd,
455+ /* in-memory snapshot */ 0x1 ,
456+ /* HCS*/ {},
457+ /* maxVisibleSeqno*/ {}));
458+
459+ /* Simulate a situation where adding a mutation temporarily fails
460+ and hence adds the mutation to a replication buffer. For that, we
461+ set vbucket::takeover_backed_up to true */
462+ engine->getKVBucket ()->getVBucket (vbid)->setTakeoverBackedUpState (true );
463+
464+ /* Send an item for replication and expect it to be buffered */
465+ const DocKey docKey{" mykey" , DocKeyEncodesCollectionId::No};
466+ EXPECT_EQ (ENGINE_SUCCESS,
467+ consumer->mutation (opaque,
468+ docKey,
469+ {}, // value
470+ 0 , // priv bytes
471+ PROTOCOL_BINARY_RAW_BYTES,
472+ 0 , // cas
473+ vbid,
474+ 0 , // flags
475+ bySeqno,
476+ 0 , // rev seqno
477+ 0 , // exptime
478+ 0 , // locktime
479+ {}, // meta
480+ 0 )); // nru
481+ EXPECT_EQ (1 , stream->getNumBufferItems ());
482+
483+ /* Set back the vbucket::takeover_backed_up to false */
484+ engine->getKVBucket ()->getVBucket (vbid)->setTakeoverBackedUpState (false );
485+
486+ /* Set 'mem_used' beyond the 'replication threshold' */
487+ EPStats& stats = engine->getEpStats ();
488+ if (beyondThreshold) {
489+ /* Actually setting it well above also, as there can be a drop in memory
490+ usage during testing */
491+ stats.setMaxDataSize (stats.getPreciseTotalMemoryUsed () / 4 );
492+ } else {
493+ /* set max size to a value just over */
494+ stats.setMaxDataSize (stats.getPreciseTotalMemoryUsed () + 1 );
495+ /* Simpler to set the replication threshold to 1 and test, rather than
496+ testing with maxData = (memUsed / replicationThrottleThreshold); that
497+ is, we are avoiding a division */
498+ engine->getConfiguration ().setReplicationThrottleThreshold (100 );
499+ }
500+
501+ MockDcpMessageProducers producers (engine.get ());
502+ if ((engine->getConfiguration ().getBucketType () == " ephemeral" ) &&
503+ (engine->getConfiguration ().getEphemeralFullPolicy ()) ==
504+ " fail_new_data" ) {
505+ /* Make a call to the function that would be called by the processor
506+ task here */
507+ EXPECT_EQ (stop_processing, consumer->processBufferedItems ());
508+
509+ /* Expect the connection to be notified */
510+ EXPECT_FALSE (consumer->isPaused ());
511+
512+ /* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
513+ EXPECT_EQ (ENGINE_DISCONNECT, consumer->step (&producers));
514+ } else {
515+ uint32_t backfoffs = consumer->getNumBackoffs ();
516+
517+ /* Make a call to the function that would be called by the processor
518+ task here */
519+ if (beyondThreshold) {
520+ EXPECT_EQ (more_to_process, consumer->processBufferedItems ());
521+ } else {
522+ EXPECT_EQ (cannot_process, consumer->processBufferedItems ());
523+ }
524+
525+ EXPECT_EQ (backfoffs + 1 , consumer->getNumBackoffs ());
526+
527+ /* In 'couchbase' buckets we buffer the replica items and indirectly
528+ throttle replication by not sending flow control acks to the
529+ producer. Hence we do not drop the connection here */
530+ EXPECT_EQ (ENGINE_SUCCESS, consumer->step (&producers));
531+
532+ /* Close stream before deleting the connection */
533+ EXPECT_EQ (ENGINE_SUCCESS, consumer->closeStream (opaque, vbid));
534+ }
535+
536+ connMap.disconnect (cookie);
537+ EXPECT_FALSE (connMap.isDeadConnectionsEmpty ());
538+ connMap.manageConnections ();
539+ EXPECT_TRUE (connMap.isDeadConnectionsEmpty ());
540+ }
541+
542+ /* Here we test how the Processor task in DCP consumer handles the scenario
543+ where the memory usage is beyond the replication throttle threshold.
544+ In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
545+ indicate close of the consumer conn and in other cases it is expected to
546+ just defer processing. */
547+ TEST_P (STDcpTest, ProcessReplicationBufferAfterThrottleThreshold) {
548+ processConsumerMutationsNearThreshold (true );
549+ }
550+
551+ /* Here we test how the Processor task in DCP consumer handles the scenario
552+ where the memory usage is just below the replication throttle threshold,
553+ but will go over the threshold when it adds the new mutation from the
554+ processor buffer to the hashtable.
555+ In case of Ephemeral buckets with 'fail_new_data' policy it is expected to
556+ indicate close of the consumer conn and in other cases it is expected to
557+ just defer processing. */
558+ TEST_P (STDcpTest,
559+ DISABLED_ProcessReplicationBufferJustBeforeThrottleThreshold) {
560+ /* There are sporadic failures seen while testing this. The problem is
561+ we need to have a memory usage just below max_size, so we need to
562+ start at that point. But sometimes the memory usage goes further below
563+ resulting in the test failure (a hang). Hence commenting out the test.
564+ Can be run locally as and when needed. */
565+ processConsumerMutationsNearThreshold (false );
566+ }
567+
415568INSTANTIATE_TEST_SUITE_P (PersistentAndEphemeral,
416569 STDcpTest,
417570 STParameterizedBucketTest::allConfigValues ());
0 commit comments