Skip to content

Commit 70a55ef

Browse files
committed
[improve] modify the negativeACK structure to reduce memory overhead
1 parent 639786f commit 70a55ef

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,17 @@ if (INTEGRATE_VCPKG)
124124
find_package(protobuf CONFIG REQUIRED)
125125
find_package(zstd CONFIG REQUIRED)
126126
find_package(Snappy CONFIG REQUIRED)
127+
find_package(roaring CONFIG REQUIRED)
127128
set(COMMON_LIBS CURL::libcurl
128129
ZLIB::ZLIB
129130
OpenSSL::SSL
130131
OpenSSL::Crypto
131132
protobuf::libprotobuf
132133
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
133134
Snappy::snappy
135+
roaring::roaring
136+
roaring::roaring-headers
137+
roaring::roaring-headers-cpp
134138
)
135139
if (USE_ASIO)
136140
find_package(asio CONFIG REQUIRED)

lib/NegativeAcksTracker.cc

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,20 @@
1919

2020
#include "NegativeAcksTracker.h"
2121

22+
#include <bits/stdint-uintn.h>
23+
2224
#include <functional>
2325
#include <set>
26+
#include <utility>
2427

2528
#include "ClientImpl.h"
2629
#include "ConsumerImpl.h"
2730
#include "ExecutorService.h"
2831
#include "LogUtils.h"
2932
#include "MessageIdUtil.h"
33+
#include "pulsar/MessageBuilder.h"
34+
#include "pulsar/MessageId.h"
35+
#include "pulsar/MessageIdBuilder.h"
3036
DECLARE_LOG_OBJECT()
3137

3238
namespace pulsar {
@@ -75,13 +81,22 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
7581

7682
auto now = Clock::now();
7783

84+
// The map is sorted by time, so we can exit immediately when we traverse to a time that does not match
7885
for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
79-
if (it->second < now) {
80-
messagesToRedeliver.insert(it->first);
81-
it = nackedMessages_.erase(it);
82-
} else {
83-
++it;
86+
if (it->first > now) {
87+
// We are done with all the messages that need to be redelivered
88+
break;
89+
}
90+
91+
auto ledgerMap = it->second;
92+
for (auto ledgerIt = ledgerMap.begin(); ledgerIt != ledgerMap.end(); ++ledgerIt) {
93+
auto entrySet = ledgerIt->second;
94+
for (auto setIt = entrySet.begin(); setIt != entrySet.end(); ++setIt) {
95+
messagesToRedeliver.insert(
96+
MessageIdBuilder().ledgerId(ledgerIt->first).entryId(*setIt).build());
97+
}
8498
}
99+
it = nackedMessages_.erase(it);
85100
}
86101
lock.unlock();
87102

@@ -99,7 +114,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
99114
{
100115
std::lock_guard<std::mutex> lock{mutex_};
101116
// Erase batch id to group all nacks from same batch
102-
nackedMessages_[msgId] = now + nackDelay_;
117+
nackedMessages_[now][msgId.ledgerId()].add((uint64_t)msgId.entryId());
103118
}
104119

105120
scheduleTimer();

lib/NegativeAcksTracker.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include <map>
2828
#include <memory>
2929
#include <mutex>
30+
#include <roaring/roaring64map.hh>
31+
#include <unordered_map>
3032

3133
#include "AsioDefines.h"
3234
#include "AsioTimer.h"
@@ -39,6 +41,7 @@ class ClientImpl;
3941
using ClientImplPtr = std::shared_ptr<ClientImpl>;
4042
class ExecutorService;
4143
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
44+
using LedgerId = int64_t;
4245

4346
class NegativeAcksTracker : public std::enable_shared_from_this<NegativeAcksTracker> {
4447
public:
@@ -65,7 +68,7 @@ class NegativeAcksTracker : public std::enable_shared_from_this<NegativeAcksTrac
6568
std::chrono::milliseconds nackDelay_;
6669
std::chrono::milliseconds timerInterval_;
6770
typedef typename std::chrono::steady_clock Clock;
68-
std::map<MessageId, Clock::time_point> nackedMessages_;
71+
std::map<Clock::time_point, std::unordered_map<LedgerId, roaring::Roaring64Map>> nackedMessages_;
6972

7073
const DeadlineTimerPtr timer_;
7174
std::atomic_bool closed_{false};

vcpkg.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454
{
5555
"name": "zstd",
5656
"version>=": "1.5.5"
57+
},
58+
{
59+
"name" : "roaring"
5760
}
5861
],
5962
"features": {

0 commit comments

Comments
 (0)