Skip to content

Commit 0b143e3

Browse files
committed
Restore the shutdown virtual methods for cleanup in ClientImpl
1 parent 3d2e0cd commit 0b143e3

13 files changed

+53
-29
lines changed

lib/ClientImpl.cc

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -739,15 +739,25 @@ void ClientImpl::shutdown() {
739739
auto producers = producers_.move();
740740
auto consumers = consumers_.move();
741741

742-
auto numProducers = producers.size();
743-
auto numConsumers = consumers.size();
744-
// Call the destructors out of the lock to avoid deadlocks
745-
producers.clear();
746-
consumers.clear();
742+
for (auto&& kv : producers) {
743+
ProducerImplBasePtr producer = kv.second.lock();
744+
if (producer) {
745+
producer->shutdown();
746+
}
747+
}
747748

748-
if (numProducers + numConsumers > 0) {
749-
LOG_DEBUG(numProducers << " producers and " << numConsumers << " consumers have been shutdown.");
749+
for (auto&& kv : consumers) {
750+
ConsumerImplBasePtr consumer = kv.second.lock();
751+
if (consumer) {
752+
consumer->shutdown();
753+
}
750754
}
755+
756+
if (producers.size() + consumers.size() > 0) {
757+
LOG_DEBUG(producers.size() << " producers and " << consumers.size()
758+
<< " consumers have been shutdown.");
759+
}
760+
751761
if (!pool_.close()) {
752762
// pool_ has already been closed. It means shutdown() has been called before.
753763
return;

lib/ConsumerImpl.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ ConsumerImpl::~ConsumerImpl() {
179179
LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command");
180180
}
181181
}
182-
shutdown();
182+
internalShutdown();
183183
}
184184

185185
void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; }
@@ -373,7 +373,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) {
373373

374374
auto callback = [this, originalCallback](Result result) {
375375
if (result == ResultOk) {
376-
shutdown();
376+
internalShutdown();
377377
LOG_INFO(getName() << "Unsubscribed successfully");
378378
} else {
379379
state_ = Ready;
@@ -1312,7 +1312,7 @@ void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>& assign
13121312

13131313
void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13141314
auto callback = [this, originalCallback](Result result, bool alreadyClosed = false) {
1315-
shutdown();
1315+
internalShutdown();
13161316
if (result == ResultOk) {
13171317
if (!alreadyClosed) {
13181318
LOG_INFO(getName() << "Closed consumer " << consumerId_);
@@ -1368,7 +1368,9 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13681368

13691369
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
13701370

1371-
void ConsumerImpl::shutdown() {
1371+
void ConsumerImpl::shutdown() { internalShutdown(); }
1372+
1373+
void ConsumerImpl::internalShutdown() {
13721374
ackGroupingTrackerPtr_.reset();
13731375
incomingMessages_.clear();
13741376
possibleSendToDeadLetterTopicMessages_.clear();

lib/ConsumerImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ class ConsumerImpl : public ConsumerImplBase {
121121
void acknowledgeCumulativeAsync(const MessageId& msgId, const ResultCallback& callback) override;
122122
void closeAsync(const ResultCallback& callback) override;
123123
void start() override;
124-
void shutdown();
124+
void shutdown() override;
125+
void internalShutdown();
125126
bool isClosed() override;
126127
bool isOpen() override;
127128
Result pauseMessageListener() override;

lib/ConsumerImplBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class ConsumerImplBase : public HandlerBase {
6363
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, const ResultCallback& callback) = 0;
6464
virtual void closeAsync(const ResultCallback& callback) = 0;
6565
virtual void start() = 0;
66+
virtual void shutdown() = 0;
6667
virtual bool isClosed() = 0;
6768
virtual bool isOpen() = 0;
6869
virtual Result pauseMessageListener() = 0;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(const ResultCallback& originalCal
328328

329329
auto callback = [this, originalCallback](Result result) {
330330
if (result == ResultOk) {
331-
shutdown();
331+
internalShutdown();
332332
LOG_INFO(getName() << "Unsubscribed successfully");
333333
} else {
334334
state_ = Ready;
@@ -452,7 +452,7 @@ void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& originalCallback)
452452
auto callback = [weakSelf, originalCallback](Result result) {
453453
auto self = weakSelf.lock();
454454
if (self) {
455-
self->shutdown();
455+
self->internalShutdown();
456456
if (result != ResultOk) {
457457
LOG_WARN(self->getName() << "Failed to close consumer: " << result);
458458
if (result != ResultAlreadyClosed) {
@@ -735,7 +735,7 @@ void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
735735
}
736736
}
737737

738-
MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
738+
MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { internalShutdown(); }
739739

740740
Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
741741
return multiTopicsConsumerCreatedPromise_.getFuture();
@@ -746,7 +746,9 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }
746746

747747
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
748748

749-
void MultiTopicsConsumerImpl::shutdown() {
749+
void MultiTopicsConsumerImpl::shutdown() { internalShutdown(); }
750+
751+
void MultiTopicsConsumerImpl::internalShutdown() {
750752
cancelTimers();
751753
incomingMessages_.clear();
752754
topicsPartitions_.clear();

lib/MultiTopicsConsumerImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
8181
void acknowledgeCumulativeAsync(const MessageId& msgId, const ResultCallback& callback) override;
8282
void closeAsync(const ResultCallback& callback) override;
8383
void start() override;
84-
void shutdown();
84+
void shutdown() override;
85+
void internalShutdown();
8586
bool isClosed() override;
8687
bool isOpen() override;
8788
Result pauseMessageListener() override;

lib/PartitionedProducerImpl.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
7979
}
8080
}
8181

82-
PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); }
82+
PartitionedProducerImpl::~PartitionedProducerImpl() { internalShutdown(); }
8383
// override
8484
const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
8585

@@ -244,7 +244,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
244244
}
245245

246246
// override
247-
void PartitionedProducerImpl::shutdown() {
247+
void PartitionedProducerImpl::shutdown() { internalShutdown(); }
248+
249+
void PartitionedProducerImpl::internalShutdown() {
248250
cancelTimers();
249251
interceptors_->close();
250252
auto client = client_.lock();
@@ -284,7 +286,7 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
284286
void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
285287
auto closeCallback = [this, originalCallback](Result result) {
286288
if (result == ResultOk) {
287-
shutdown();
289+
internalShutdown();
288290
}
289291
if (originalCallback) {
290292
originalCallback(result);

lib/PartitionedProducerImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
7676
*/
7777
void closeAsync(CloseCallback callback) override;
7878
void start() override;
79-
void shutdown();
79+
void shutdown() override;
80+
void internalShutdown();
8081
bool isClosed() override;
8182
const std::string& getTopic() const override;
8283
Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;

lib/PatternMultiTopicsConsumerImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ void PatternMultiTopicsConsumerImpl::start() {
242242
}
243243
}
244244

245-
void PatternMultiTopicsConsumerImpl::shutdown() {
245+
PatternMultiTopicsConsumerImpl::~PatternMultiTopicsConsumerImpl() {
246246
cancelTimers();
247-
MultiTopicsConsumerImpl::shutdown();
247+
internalShutdown();
248248
}
249249

250250
void PatternMultiTopicsConsumerImpl::closeAsync(const ResultCallback& callback) {

lib/PatternMultiTopicsConsumerImpl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
5454
const std::string& subscriptionName, const ConsumerConfiguration& conf,
5555
const LookupServicePtr& lookupServicePtr_,
5656
const ConsumerInterceptorsPtr& interceptors);
57+
~PatternMultiTopicsConsumerImpl() override;
5758

5859
const PULSAR_REGEX_NAMESPACE::regex getPattern();
5960

@@ -67,9 +68,8 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
6768
static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1,
6869
std::vector<std::string>& list2);
6970

70-
virtual void closeAsync(const ResultCallback& callback);
71-
virtual void start();
72-
virtual void shutdown();
71+
void closeAsync(const ResultCallback& callback) override;
72+
void start() override;
7373

7474
private:
7575
const std::string patternString_;

0 commit comments

Comments
 (0)