@@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) {
15191519 }
15201520}
15211521
1522+ TEST (ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
1523+ Client client (lookupUrl);
1524+
1525+ const int MSG_COUNT = 100 ;
1526+ std::string topicName = " persistent://public/default/my-topic-" + std::to_string (time (nullptr ));
1527+
1528+ // 1. Create producer send 100 msgs
1529+ Producer producer;
1530+ ProducerConfiguration producerConfig;
1531+ producerConfig.setBatchingEnabled (false );
1532+ ASSERT_EQ (ResultOk, client.createProducer (topicName, producerConfig, producer));
1533+ for (int i = 0 ; i < MSG_COUNT; ++i) {
1534+ std::string msg = " my-message-" + std::to_string (i);
1535+ Message message = MessageBuilder ().setContent (msg).build ();
1536+ ASSERT_EQ (ResultOk, producer.send (message));
1537+ }
1538+
1539+ // 2. Create consumer with listener
1540+ Consumer consumer;
1541+ ConsumerConfiguration consumerConfig;
1542+ consumerConfig.setSubscriptionInitialPosition (InitialPositionEarliest);
1543+ Latch latchFirstReceiveMsg (1 );
1544+ Latch latchAfterClosed (1 );
1545+ consumerConfig.setMessageListener (
1546+ [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) {
1547+ latchFirstReceiveMsg.countdown ();
1548+ LOG_INFO (" Consume message: " << msg.getDataAsString ());
1549+ latchAfterClosed.wait ();
1550+ });
1551+ auto result = client.subscribe (topicName, " test-sub" , consumerConfig, consumer);
1552+ ASSERT_EQ (ResultOk, result);
1553+
1554+ // 3. wait first message consumed in listener and then close consumer.
1555+ latchFirstReceiveMsg.wait ();
1556+ ASSERT_EQ (ResultOk, consumer.close ());
1557+ latchAfterClosed.countdown ();
1558+
1559+ ASSERT_EQ (ResultOk, producer.close ());
1560+ ASSERT_EQ (ResultOk, client.close ());
1561+ }
1562+
15221563} // namespace pulsar
0 commit comments