Skip to content

Commit 6daf7a5

Browse files
authored
Avoid blocking the message listener threads (apache#332)
### Motivation The message listener thread blocks when the receiver queue of `MultiTopicsConsumerImpl` is full. As message listener threads are used by all consumers in the same `Client`, if one slow consumer blocks the listener threads, all other consuemrs can no longer receive new messages. ### Modifications 1. Modify `MultiTopicsConsumerImpl` to use `UnboundedBlockingQueue` to avoid blocking 2. Modify the permit update logic: Increase permit only after messages consumed from `MultiTopicsConsumerImpl`
1 parent 7cefe0e commit 6daf7a5

File tree

6 files changed

+92
-10
lines changed

6 files changed

+92
-10
lines changed

lib/ConsumerImpl.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
10321032
return;
10331033
}
10341034

1035-
increaseAvailablePermits(currentCnx);
1035+
if (!hasParent_) {
1036+
increaseAvailablePermits(currentCnx);
1037+
}
10361038
if (track) {
10371039
trackMessage(msg.getMessageId());
10381040
}
@@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
10891091
}
10901092
}
10911093

1094+
void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
1095+
ClientConnectionPtr currentCnx = getCnx().lock();
1096+
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
1097+
LOG_DEBUG(getName() << "Not adding permit since connection is different.");
1098+
return;
1099+
}
1100+
1101+
increaseAvailablePermits(currentCnx);
1102+
}
1103+
10921104
inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
10931105
ConsumerType type = config_.getConsumerType();
10941106
switch (type) {

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
168168
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
169169
CommandAck_ValidationError validationError);
170170
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
171+
void increaseAvailablePermits(const Message& msg);
171172
void drainIncomingMessageQueue(size_t count);
172173
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
173174
const BitSet& ackSet, int redeliveryCount);

lib/MessageImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class MessageImpl {
4747
int redeliveryCount_;
4848
bool hasSchemaVersion_;
4949
const std::string* schemaVersion_;
50+
std::weak_ptr<class ConsumerImpl> consumerPtr_;
5051

5152
const std::string& getPartitionKey() const;
5253
bool hasPartitionKey() const;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
519519
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
520520
<< " message:" << msg.getDataAsString());
521521
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
522+
msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
522523

523524
Lock lock(pendingReceiveMutex_);
524525
if (!pendingReceives_.empty()) {
@@ -530,18 +531,15 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
530531
auto self = weakSelf.lock();
531532
if (self) {
532533
notifyPendingReceivedCallback(ResultOk, msg, callback);
534+
auto consumer = msg.impl_->consumerPtr_.lock();
535+
if (consumer) {
536+
consumer->increaseAvailablePermits(msg);
537+
}
533538
}
534539
});
535540
return;
536541
}
537542

538-
if (incomingMessages_.full()) {
539-
lock.unlock();
540-
}
541-
542-
// add message to block queue.
543-
// when messages queue is full, will block listener thread on ConsumerImpl,
544-
// then will not send permits to broker, will broker stop push message.
545543
incomingMessages_.push(msg);
546544
incomingMessagesSize_.fetch_add(msg.getLength());
547545

@@ -1072,6 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
10721070
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
10731071
incomingMessagesSize_.fetch_sub(msg.getLength());
10741072
unAckedMessageTrackerPtr_->add(msg.getMessageId());
1073+
auto consumer = msg.impl_->consumerPtr_.lock();
1074+
if (consumer) {
1075+
consumer->increaseAvailablePermits(msg);
1076+
}
10751077
}
10761078

10771079
std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {

lib/MultiTopicsConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include <memory>
2525
#include <vector>
2626

27-
#include "BlockingQueue.h"
2827
#include "Commands.h"
2928
#include "ConsumerImplBase.h"
3029
#include "ConsumerInterceptors.h"
@@ -33,6 +32,7 @@
3332
#include "LookupDataResult.h"
3433
#include "SynchronizedHashMap.h"
3534
#include "TestUtil.h"
35+
#include "UnboundedBlockingQueue.h"
3636

3737
namespace pulsar {
3838
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
@@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
115115
std::map<std::string, int> topicsPartitions_;
116116
mutable std::mutex mutex_;
117117
std::mutex pendingReceiveMutex_;
118-
BlockingQueue<Message> incomingMessages_;
118+
UnboundedBlockingQueue<Message> incomingMessages_;
119119
std::atomic_int incomingMessagesSize_ = {0};
120120
MessageListener messageListener_;
121121
DeadlineTimerPtr partitionsUpdateTimer_;

tests/ConsumerTest.cc

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,4 +1335,70 @@ TEST(ConsumerTest, testRetrySubscribe) {
13351335
// milliseconds
13361336
}
13371337

1338+
TEST(ConsumerTest, testNoListenerThreadBlocking) {
1339+
Client client{lookupUrl};
1340+
1341+
const int numPartitions = 2;
1342+
const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
1343+
int res =
1344+
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
1345+
std::to_string(numPartitions));
1346+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
1347+
1348+
const int receiverQueueSize = 1;
1349+
const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;
1350+
1351+
Consumer consumer1, consumer2;
1352+
ConsumerConfiguration consumerConfig;
1353+
consumerConfig.setReceiverQueueSize(receiverQueueSize);
1354+
consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
1355+
Result consumerResult;
1356+
consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
1357+
ASSERT_EQ(consumerResult, ResultOk);
1358+
consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
1359+
ASSERT_EQ(consumerResult, ResultOk);
1360+
1361+
Producer producer;
1362+
ProducerConfiguration producerConfig;
1363+
producerConfig.setBatchingEnabled(false);
1364+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
1365+
Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
1366+
ASSERT_EQ(producerResult, ResultOk);
1367+
1368+
const int msgCount = receiverQueueSizeAcrossPartitions * 100;
1369+
1370+
for (int i = 0; i < msgCount; ++i) {
1371+
auto msg = MessageBuilder().setContent("test").build();
1372+
producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
1373+
}
1374+
producer.flush();
1375+
producer.close();
1376+
1377+
waitUntil(std::chrono::seconds(1), [consumer1] {
1378+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
1379+
return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions;
1380+
});
1381+
1382+
// check consumer1 prefetch num
1383+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
1384+
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
1385+
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
1386+
1387+
// read consumer2 while consumer1 reaches the prefech limit
1388+
for (int i = 0; i < msgCount; ++i) {
1389+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
1390+
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
1391+
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
1392+
1393+
Message msg;
1394+
Result ret = consumer2.receive(msg, 1000);
1395+
ASSERT_EQ(ret, ResultOk);
1396+
consumer2.acknowledge(msg);
1397+
}
1398+
1399+
consumer2.close();
1400+
consumer1.close();
1401+
client.close();
1402+
}
1403+
13381404
} // namespace pulsar

0 commit comments

Comments
 (0)