@@ -917,8 +917,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
917917 std::string topicName = " partition-testMessageListenerPause" ;
918918
919919 // call admin api to make it partitioned
920- std::string url =
921- adminUrl + " admin/v2/persistent/public/default/partition-testMessageListener-pauses/partitions" ;
920+ std::string url = adminUrl + " admin/v2/persistent/public/default/" + topicName + " /partitions" ;
922921 int res = makePutRequest (url, " 5" );
923922
924923 LOG_INFO (" res = " << res);
@@ -968,6 +967,59 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
968967 client.close ();
969968}
970969
970+ void testStartPaused (bool isPartitioned) {
971+ Client client (lookupUrl);
972+ std::string topicName =
973+ isPartitioned ? " testStartPausedWithPartitionedTopic" : " testStartPausedWithNonPartitionedTopic" ;
974+ std::string subName = " sub" ;
975+
976+ if (isPartitioned) {
977+ // Call admin api to make it partitioned
978+ std::string url = adminUrl + " admin/v2/persistent/public/default/" + topicName + " /partitions" ;
979+ int res = makePutRequest (url, " 5" );
980+ LOG_INFO (" res = " << res);
981+ ASSERT_FALSE (res != 204 && res != 409 );
982+ }
983+
984+ Producer producer;
985+ Result result = client.createProducer (topicName, producer);
986+
987+ // Initializing global Count
988+ globalCount = 0 ;
989+
990+ ConsumerConfiguration consumerConfig;
991+ consumerConfig.setMessageListener (
992+ std::bind (messageListenerFunction, std::placeholders::_1, std::placeholders::_2));
993+ consumerConfig.setStartPaused (true );
994+ Consumer consumer;
995+ // Removing dangling subscription from previous test failures
996+ result = client.subscribe (topicName, subName, consumerConfig, consumer);
997+ consumer.unsubscribe ();
998+
999+ result = client.subscribe (topicName, subName, consumerConfig, consumer);
1000+ ASSERT_EQ (ResultOk, result);
1001+
1002+ int numOfMessages = 50 ;
1003+ for (int i = 0 ; i < numOfMessages; i++) {
1004+ std::string messageContent = " msg-" + std::to_string (i);
1005+ Message msg = MessageBuilder ().setContent (messageContent).build ();
1006+ ASSERT_EQ (ResultOk, producer.send (msg));
1007+ }
1008+
1009+ std::this_thread::sleep_for (std::chrono::microseconds (2 * 1000 * 1000 ));
1010+ ASSERT_EQ (globalCount, 0 );
1011+ consumer.resumeMessageListener ();
1012+ ASSERT_TRUE (waitUntil (std::chrono::seconds (5 ), [&]() -> bool { return globalCount >= numOfMessages; }));
1013+
1014+ consumer.unsubscribe ();
1015+ producer.close ();
1016+ client.close ();
1017+ }
1018+
1019+ TEST (BasicEndToEndTest, testStartPausedWithNonPartitionedTopic) { testStartPaused (false ); }
1020+
1021+ TEST (BasicEndToEndTest, testStartPausedWithPartitionedTopic) { testStartPaused (true ); }
1022+
9711023TEST (BasicEndToEndTest, testResendViaSendCallback) {
9721024 ClientConfiguration clientConfiguration;
9731025 clientConfiguration.setIOThreads (1 );
0 commit comments