Skip to content

Commit 386dedc

Browse files
authored
Fix nullptr after listener consumer closed (#510)
1 parent 4366ffe commit 386dedc

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

lib/MultiTopicsConsumerImpl.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Me
556556

557557
void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) {
558558
Message m;
559-
incomingMessages_.pop(m);
559+
if (!incomingMessages_.pop(m)) {
560+
return;
561+
}
560562
try {
561563
Consumer self{get_shared_this_ptr()};
562564
messageProcessed(m);

tests/ConsumerTest.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) {
15191519
}
15201520
}
15211521

1522+
TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
1523+
Client client(lookupUrl);
1524+
1525+
const int MSG_COUNT = 100;
1526+
std::string topicName = "persistent://public/default/my-topic-" + std::to_string(time(nullptr));
1527+
1528+
// 1. Create producer send 100 msgs
1529+
Producer producer;
1530+
ProducerConfiguration producerConfig;
1531+
producerConfig.setBatchingEnabled(false);
1532+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
1533+
for (int i = 0; i < MSG_COUNT; ++i) {
1534+
std::string msg = "my-message-" + std::to_string(i);
1535+
Message message = MessageBuilder().setContent(msg).build();
1536+
ASSERT_EQ(ResultOk, producer.send(message));
1537+
}
1538+
1539+
// 2. Create consumer with listener
1540+
Consumer consumer;
1541+
ConsumerConfiguration consumerConfig;
1542+
consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
1543+
Latch latchFirstReceiveMsg(1);
1544+
Latch latchAfterClosed(1);
1545+
consumerConfig.setMessageListener(
1546+
[&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) {
1547+
latchFirstReceiveMsg.countdown();
1548+
LOG_INFO("Consume message: " << msg.getDataAsString());
1549+
latchAfterClosed.wait();
1550+
});
1551+
auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer);
1552+
ASSERT_EQ(ResultOk, result);
1553+
1554+
// 3. wait first message consumed in listener and then close consumer.
1555+
latchFirstReceiveMsg.wait();
1556+
ASSERT_EQ(ResultOk, consumer.close());
1557+
latchAfterClosed.countdown();
1558+
1559+
ASSERT_EQ(ResultOk, producer.close());
1560+
ASSERT_EQ(ResultOk, client.close());
1561+
}
1562+
15221563
} // namespace pulsar

0 commit comments

Comments
 (0)