Skip to content

Commit 3f0b33b

Browse files
authored
fix: Incorrect acknowledgment behavior in the listener of the multi-topic consumer. (apache#423)
### Motivation apache/pulsar-client-node#371 ### Modifications - Add the message to the unacknowledged tracker before call the listener. ### Verifying this change - Add `testMultiConsumerListenerAndAck` to cover it.
1 parent 27d8cc0 commit 3f0b33b

File tree

4 files changed

+45
-1
lines changed

4 files changed

+45
-1
lines changed

lib/MultiTopicsConsumerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,8 +568,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
568568
incomingMessages_.pop(m);
569569
try {
570570
Consumer self{get_shared_this_ptr()};
571-
messageListener_(self, m);
572571
messageProcessed(m);
572+
messageListener_(self, m);
573573
} catch (const std::exception& e) {
574574
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
575575
}

lib/MultiTopicsConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
183183
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
184184
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
185185
FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
186+
FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
186187
};
187188

188189
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;

lib/UnAckedMessageTrackerEnabled.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAcked
6767
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
6868
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
6969
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
70+
FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
7071
};
7172
} // namespace pulsar
7273

tests/ConsumerTest.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,4 +1490,46 @@ TEST(ConsumerTest, testSNIProxyConnect) {
14901490
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
14911491
client.close();
14921492
}
1493+
1494+
TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
1495+
Client client{lookupUrl};
1496+
1497+
const std::string topicName = "testConsumerEventWithPartition-topic-" + std::to_string(time(nullptr));
1498+
int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions",
1499+
std::to_string(5));
1500+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
1501+
1502+
// Create a producer
1503+
Producer producer;
1504+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
1505+
1506+
int num = 10;
1507+
// Use listener to consume
1508+
Latch latch{num};
1509+
Consumer consumer;
1510+
ConsumerConfiguration consumerConfiguration;
1511+
PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfiguration, 2000);
1512+
consumerConfiguration.setMessageListener([&latch](Consumer& consumer, const Message& msg) {
1513+
LOG_INFO("Received message '" << msg.getDataAsString() << "' and ack it");
1514+
consumer.acknowledge(msg);
1515+
latch.countdown();
1516+
});
1517+
ASSERT_EQ(ResultOk, client.subscribe(topicName, "consumer-1", consumerConfiguration, consumer));
1518+
1519+
// Send synchronously
1520+
for (int i = 0; i < 10; ++i) {
1521+
Message msg = MessageBuilder().setContent("content" + std::to_string(i)).build();
1522+
Result result = producer.send(msg);
1523+
LOG_INFO("Message sent: " << result);
1524+
}
1525+
1526+
ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
1527+
auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
1528+
auto tracker =
1529+
static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImplPtr->unAckedMessageTrackerPtr_.get());
1530+
ASSERT_EQ(0, tracker->size());
1531+
1532+
client.close();
1533+
}
1534+
14931535
} // namespace pulsar

0 commit comments

Comments
 (0)