@@ -705,6 +705,52 @@ TEST_P(STDcpTest, test_producer_no_stream_end_on_client_close_stream) {
705705 connMap.manageConnections ();
706706}
707707
708+ TEST_P (STDcpTest, test_consumer_add_stream) {
709+ setVBucketStateAndRunPersistTask (vbid, vbucket_state_replica);
710+
711+ auto & connMap = static_cast <MockDcpConnMap&>(engine->getDcpConnMap ());
712+
713+ const void * cookie = create_mock_cookie (engine.get ());
714+ Vbid vbid = Vbid (0 );
715+
716+ /* Create a Mock Dcp consumer */
717+ auto consumer =
718+ std::make_shared<MockDcpConsumer>(*engine, cookie, " test_consumer" );
719+ connMap.addConn (cookie, consumer);
720+
721+ ASSERT_EQ (ENGINE_SUCCESS,
722+ consumer->addStream (/* opaque*/ 0 ,
723+ vbid,
724+ /* flags*/ 0 ));
725+
726+ /* Set the passive to dead state. Note that we want to set the stream to
727+ dead state but not erase it from the streams map in the consumer
728+ connection*/
729+ MockPassiveStream* stream = static_cast <MockPassiveStream*>(
730+ (consumer->getVbucketStream (vbid)).get ());
731+
732+ stream->transitionStateToDead ();
733+
734+ /* Add a passive stream on the same vb */
735+ ASSERT_EQ (ENGINE_SUCCESS,
736+ consumer->addStream (/* opaque*/ 0 ,
737+ vbid,
738+ /* flags*/ 0 ));
739+
740+ /* Expected the newly added stream to be in active state */
741+ stream = static_cast <MockPassiveStream*>(
742+ (consumer->getVbucketStream (vbid)).get ());
743+ ASSERT_TRUE (stream->isActive ());
744+
745+ /* Close stream before deleting the connection */
746+ ASSERT_EQ (ENGINE_SUCCESS, consumer->closeStream (/* opaque*/ 0 , vbid));
747+
748+ connMap.disconnect (cookie);
749+ EXPECT_FALSE (connMap.isDeadConnectionsEmpty ());
750+ connMap.manageConnections ();
751+ EXPECT_TRUE (connMap.isDeadConnectionsEmpty ());
752+ }
753+
708754void STDcpTest::sendConsumerMutationsNearThreshold (bool beyondThreshold) {
709755 setVBucketStateAndRunPersistTask (vbid, vbucket_state_replica);
710756
0 commit comments