@@ -1519,4 +1519,46 @@ TEST(ConsumerTest, testDuplicatedTopics) {
15191519 }
15201520}
15211521
1522+ TEST (ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
1523+ Client client (lookupUrl);
1524+
1525+ const int MSG_COUNT = 100 ;
1526+ std::string topicName = " persistent://public/default/my-topic-" + std::to_string (time (nullptr ));
1527+
1528+ // 1. Create producer send 100 msgs
1529+ Producer producer;
1530+ ProducerConfiguration producerConfig;
1531+ producerConfig.setBatchingEnabled (false );
1532+ ASSERT_EQ (ResultOk, client.createProducer (topicName, producerConfig, producer));
1533+ for (int i = 0 ; i < MSG_COUNT; ++i) {
1534+ std::string msg = " my-message-" + std::to_string (i);
1535+ Message message = MessageBuilder ().setContent (msg).build ();
1536+ ASSERT_EQ (ResultOk, producer.send (message));
1537+ }
1538+ ASSERT_EQ (ResultOk, producer.flush ());
1539+
1540+ // 2. Create consumer with listener
1541+ Consumer consumer;
1542+ ConsumerConfiguration consumerConfig;
1543+ consumerConfig.setSubscriptionInitialPosition (InitialPositionEarliest);
1544+ Latch latchFirstReceiveMsg (1 );
1545+ Latch latchAfterClosed (1 );
1546+ consumerConfig.setMessageListener ([&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) {
1547+ latchFirstReceiveMsg.countdown ();
1548+ std::cout << " Consume message: " << msg.getDataAsString () << std::endl;
1549+ latchAfterClosed.wait ();
1550+ });
1551+ auto result = client.subscribe (topicName, " test-sub" , consumerConfig, consumer);
1552+ ASSERT_EQ (ResultOk, result);
1553+
1554+ // 3. wait first message consumed in listener and then close consumer.
1555+ latchFirstReceiveMsg.wait ();
1556+ ASSERT_EQ (ResultOk, consumer.close ());
1557+ latchAfterClosed.countdown ();
1558+
1559+ ASSERT_EQ (ResultOk, producer.close ());
1560+ ASSERT_EQ (ResultOk, client.close ());
1561+ }
1562+
1563+
15221564} // namespace pulsar
0 commit comments