Skip to content

Commit cd7972b

Browse files
committed
Refactor the ack grouping tracker
1 parent 585519d commit cd7972b

File tree

8 files changed

+231
-299
lines changed

8 files changed

+231
-299
lines changed

lib/AckGroupingTracker.cc

Lines changed: 0 additions & 141 deletions
This file was deleted.

lib/AckGroupingTracker.h

Lines changed: 15 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
#include <pulsar/MessageId.h>
2323
#include <pulsar/Result.h>
2424

25-
#include <cstdint>
2625
#include <functional>
27-
#include <set>
2826

29-
#include "ProtoApiEnums.h"
3027
#include "lib/HandlerBase.h"
3128

3229
namespace pulsar {
@@ -35,7 +32,9 @@ class ClientConnection;
3532
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
3633
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
3734
using ResultCallback = std::function<void(Result)>;
38-
using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
35+
class ConsumerImpl;
36+
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
37+
using ConsumerImplWeakPtr = std::weak_ptr<ConsumerImpl>;
3938

4039
/**
4140
* @class AckGroupingTracker
@@ -44,19 +43,12 @@ using HandlerBaseWeakPtr = std::weak_ptr<HandlerBase>;
4443
*/
4544
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
4645
public:
47-
AckGroupingTracker(std::function<uint64_t()> requestIdSupplier, uint64_t consumerId, bool waitResponse)
48-
: requestIdSupplier_(std::move(requestIdSupplier)),
49-
consumerId_(consumerId),
50-
waitResponse_(waitResponse) {}
51-
5246
virtual ~AckGroupingTracker() = default;
5347

5448
/**
5549
* Start tracking the ACK requests.
56-
*
57-
* @param[in] handler the handler to get a ClientConnection for sending the ACK requests.
5850
*/
59-
virtual void start(const HandlerBaseWeakPtr& handler) { handler_ = handler; }
51+
virtual void start(const ConsumerImplPtr& consumer) { consumer_ = consumer; }
6052

6153
/**
6254
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
@@ -74,7 +66,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
7466
* @param[in] callback the callback that is triggered when the message is acknowledged
7567
*/
7668
virtual void addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
77-
callback(ResultOk);
69+
if (callback) {
70+
callback(ResultOk);
71+
}
7872
}
7973

8074
/**
@@ -83,7 +77,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
8377
* @param[in] callback the callback that is triggered when the messages are acknowledged
8478
*/
8579
virtual void addAcknowledgeList(const MessageIdList& msgIds, const ResultCallback& callback) {
86-
callback(ResultOk);
80+
if (callback) {
81+
callback(ResultOk);
82+
}
8783
}
8884

8985
/**
@@ -92,7 +88,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
9288
* @param[in] callback the callback that is triggered when the message is acknowledged
9389
*/
9490
virtual void addAcknowledgeCumulative(const MessageId& msgId, const ResultCallback& callback) {
95-
callback(ResultOk);
91+
if (callback) {
92+
callback(ResultOk);
93+
}
9694
}
9795

9896
/**
@@ -101,42 +99,10 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
10199
*/
102100
virtual void flushAndClean() {}
103101

104-
/**
105-
* Close the ACK grouping tracker, which will prevent further ACK requests being sent.
106-
*/
107-
virtual void close() { isClosed_.store(true, std::memory_order_relaxed); }
108-
109-
protected:
110-
void doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
111-
CommandAck_AckType ackType) const;
112-
void doImmediateAck(const std::set<MessageId>& msgIds, const ResultCallback& callback) const;
113-
bool isClosed() const noexcept { return isClosed_.load(std::memory_order_relaxed); }
114-
bool validateClosed(const ResultCallback& callback) const {
115-
if (isClosed()) {
116-
if (callback) {
117-
callback(ResultAlreadyClosed);
118-
}
119-
return true;
120-
}
121-
return false;
122-
}
123-
124-
private:
125-
std::weak_ptr<HandlerBase> handler_;
126-
const std::function<uint64_t()> requestIdSupplier_;
127-
const uint64_t consumerId_;
128-
std::atomic_bool isClosed_{false};
129-
130-
ClientConnectionPtr getConnection() const {
131-
auto handler = handler_.lock();
132-
if (!handler) {
133-
return nullptr;
134-
}
135-
return handler->getCnx().lock();
136-
}
102+
virtual void close() {}
137103

138104
protected:
139-
const bool waitResponse_;
105+
ConsumerImplWeakPtr consumer_;
140106

141107
}; // class AckGroupingTracker
142108

lib/AckGroupingTrackerDisabled.cc

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,41 @@
1919

2020
#include "AckGroupingTrackerDisabled.h"
2121

22-
#include "ProtoApiEnums.h"
22+
#include "ConsumerImpl.h"
2323

2424
namespace pulsar {
2525

2626
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
27-
doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
27+
auto consumer = consumer_.lock();
28+
if (consumer && !consumer->isClosingOrClosed()) {
29+
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
30+
} else if (callback) {
31+
callback(ResultAlreadyClosed);
32+
}
2833
}
2934

3035
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds,
3136
const ResultCallback& callback) {
32-
std::set<MessageId> msgIdSet;
33-
for (auto&& msgId : msgIds) {
34-
msgIdSet.emplace(msgId);
37+
auto consumer = consumer_.lock();
38+
if (consumer && !consumer->isClosingOrClosed()) {
39+
std::set<MessageId> uniqueMsgIds(msgIds.begin(), msgIds.end());
40+
for (auto&& msgId : msgIds) {
41+
uniqueMsgIds.insert(msgId);
42+
}
43+
consumer->doImmediateAck(uniqueMsgIds, callback);
44+
} else if (callback) {
45+
callback(ResultAlreadyClosed);
3546
}
36-
doImmediateAck(msgIdSet, callback);
3747
}
3848

3949
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId,
4050
const ResultCallback& callback) {
41-
doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
51+
auto consumer = consumer_.lock();
52+
if (consumer && !consumer->isClosingOrClosed()) {
53+
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
54+
} else if (callback) {
55+
callback(ResultAlreadyClosed);
56+
}
4257
}
4358

4459
} // namespace pulsar

0 commit comments

Comments
 (0)