Skip to content

Commit 4c37ad8

Browse files
committed
Fix invalid memory access for UnAckedMessageTrackerEnabled and ConsumerStatsImpl
1 parent 670124f commit 4c37ad8

File tree

5 files changed

+12
-15
lines changed

5 files changed

+12
-15
lines changed

lib/ConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic
129129
unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
130130
if (statsIntervalInSeconds) {
131131
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsImpl>(
132-
consumerStr_, client->getIOExecutorProvider()->get(), statsIntervalInSeconds);
132+
consumerStr_, client->getIOExecutorProvider()->get()->createDeadlineTimer(),
133+
statsIntervalInSeconds);
133134
} else {
134135
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>();
135136
}

lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ namespace pulsar {
3232

3333
void UnAckedMessageTrackerEnabled::timeoutHandler() {
3434
timeoutHandlerHelper();
35-
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
35+
auto client = client_.lock();
36+
if (client == nullptr) {
37+
return;
38+
}
39+
ExecutorServicePtr executorService = client->getIOExecutorProvider()->get();
3640
timer_ = executorService->createDeadlineTimer();
3741
timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
3842
std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};

lib/UnAckedMessageTrackerEnabled.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace pulsar {
3333
class ClientImpl;
3434
class ConsumerImplBase;
3535
using ClientImplPtr = std::shared_ptr<ClientImpl>;
36+
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
3637

3738
class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAckedMessageTrackerEnabled>,
3839
public UnAckedMessageTrackerInterface {
@@ -58,7 +59,7 @@ class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAcked
5859
std::deque<std::set<MessageId>> timePartitions;
5960
std::recursive_mutex lock_;
6061
ConsumerImplBase& consumerReference_;
61-
const ClientImplPtr& client_;
62+
ClientImplWeakPtr client_;
6263
DeadlineTimerPtr timer_; // DO NOT place this before client_!
6364
long timeoutMs_;
6465
long tickDurationInMs_;

lib/stats/ConsumerStatsImpl.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919

2020
#include "ConsumerStatsImpl.h"
2121

22-
#include <functional>
23-
24-
#include "lib/ExecutorService.h"
2522
#include "lib/LogUtils.h"
2623
#include "lib/Utils.h"
2724

@@ -30,11 +27,9 @@ DECLARE_LOG_OBJECT();
3027

3128
using Lock = std::unique_lock<std::mutex>;
3229

33-
ConsumerStatsImpl::ConsumerStatsImpl(const std::string& consumerStr, const ExecutorServicePtr& executor,
30+
ConsumerStatsImpl::ConsumerStatsImpl(const std::string& consumerStr, DeadlineTimerPtr timer,
3431
unsigned int statsIntervalInSeconds)
35-
: consumerStr_(consumerStr),
36-
timer_(executor->createDeadlineTimer()),
37-
statsIntervalInSeconds_(statsIntervalInSeconds) {}
32+
: consumerStr_(consumerStr), timer_(std::move(timer)), statsIntervalInSeconds_(statsIntervalInSeconds) {}
3833

3934
ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats)
4035
: consumerStr_(stats.consumerStr_),

lib/stats/ConsumerStatsImpl.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@
2727

2828
#include "ConsumerStatsBase.h"
2929
#include "lib/AsioTimer.h"
30-
#include "lib/ExecutorService.h"
3130
namespace pulsar {
3231

33-
class ExecutorService;
34-
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
35-
3632
class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>, public ConsumerStatsBase {
3733
private:
3834
std::string consumerStr_;
@@ -55,7 +51,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>
5551
friend class PulsarFriend;
5652

5753
public:
58-
ConsumerStatsImpl(const std::string&, const ExecutorServicePtr&, unsigned int);
54+
ConsumerStatsImpl(const std::string&, DeadlineTimerPtr, unsigned int);
5955
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
6056
void flushAndReset(const ASIO_ERROR&);
6157
void start() override;

0 commit comments

Comments
 (0)