Skip to content

Commit 4014fca

Browse files
committed
feat: add negativeack precision bit configuration and related tests
1 parent 8cede91 commit 4014fca

File tree

8 files changed

+110
-1
lines changed

8 files changed

+110
-1
lines changed

include/pulsar/ConsumerConfiguration.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
277277
*/
278278
long getNegativeAckRedeliveryDelayMs() const;
279279

280+
/**
281+
* Set the precision bit count for negative ack redelivery delay.
282+
* The lower bits of the redelivery time will be trimmed to reduce the memory occupation.
283+
* @param negativeAckPrecisionBitCnt
284+
* negative ack precision bit count
285+
*/
286+
void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt);
287+
288+
/**
289+
* Get the configured precision bit count for negative ack redelivery delay.
290+
*
291+
* @return redelivery time precision bit count
292+
*/
293+
int getNegativeAckPrecisionBitCnt() const;
294+
280295
/**
281296
* Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
282297
* to broker until the time window reaches its end, or the number of grouped messages reaches

lib/ConsumerConfiguration.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ void ConsumerConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) {
134134
impl_->ackGroupingTimeMs = ackGroupingMillis;
135135
}
136136

137+
int ConsumerConfiguration::getNegativeAckPrecisionBitCnt() const { return impl_->negativeAckPrecisionBitCnt; }
138+
139+
void ConsumerConfiguration::setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt) {
140+
if (negativeAckPrecisionBitCnt < 0) {
141+
throw std::invalid_argument(
142+
"Consumer Config Exception: NegativeAckPrecisionBitCnt should be nonnegative number.");
143+
}
144+
impl_->negativeAckPrecisionBitCnt = negativeAckPrecisionBitCnt;
145+
}
146+
137147
long ConsumerConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; }
138148

139149
void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {

lib/ConsumerConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct ConsumerConfigurationImpl {
2626
long unAckedMessagesTimeoutMs{0};
2727
long tickDurationInMs{1000};
2828
long negativeAckRedeliveryDelayMs{60000};
29+
int negativeAckPrecisionBitCnt{8};
2930
long ackGroupingTimeMs{100};
3031
long ackGroupingMaxSize{1000};
3132
long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds

lib/NegativeAcksTracker.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr &client, ConsumerIm
4646
nackDelay_ =
4747
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS));
4848
timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
49+
nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt();
4950
LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() << " ms - Timer interval: "
5051
<< timerInterval_.count());
5152
}
@@ -106,14 +107,27 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
106107
scheduleTimer();
107108
}
108109

110+
std::chrono::steady_clock::time_point trimLowerBit(const std::chrono::steady_clock::time_point &tp,
111+
int bits) {
112+
// get origin timestamp in nanoseconds
113+
auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count();
114+
115+
// trim lower bits
116+
auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1));
117+
118+
return std::chrono::steady_clock::time_point(std::chrono::nanoseconds(trimmedTimestamp));
119+
}
120+
109121
void NegativeAcksTracker::add(const MessageId &m) {
110122
auto msgId = discardBatch(m);
111123
auto now = Clock::now();
112124

113125
{
114126
std::lock_guard<std::mutex> lock{mutex_};
127+
auto trimmedTimestamp = trimLowerBit(now + nackDelay_, nackPrecisionBit_);
128+
// If the timestamp is already in the map, we can just add the message to the existing entry
115129
// Erase batch id to group all nacks from same batch
116-
nackedMessages_[now][msgId.ledgerId()].add((uint64_t)msgId.entryId());
130+
nackedMessages_[trimmedTimestamp][msgId.ledgerId()].add((uint64_t)msgId.entryId());
117131
}
118132

119133
scheduleTimer();

lib/NegativeAcksTracker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class NegativeAcksTracker : public std::enable_shared_from_this<NegativeAcksTrac
7272

7373
std::chrono::milliseconds nackDelay_;
7474
std::chrono::milliseconds timerInterval_;
75+
int nackPrecisionBit_;
7576
typedef typename std::chrono::steady_clock Clock;
7677
std::map<Clock::time_point, std::unordered_map<LedgerId, ConditionalRoaringMap>> nackedMessages_;
7778

lib/c/c_ConsumerConfiguration.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,16 @@ long pulsar_configure_get_negative_ack_redelivery_delay_ms(
121121
return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
122122
}
123123

124+
void pulsar_configure_set_negative_ack_precision_bit_cnt(
125+
pulsar_consumer_configuration_t *consumer_configuration, int negativeAckPrecisionBitCnt) {
126+
consumer_configuration->consumerConfiguration.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCnt);
127+
}
128+
129+
int pulsar_configure_get_negative_ack_precision_bit_cnt(
130+
pulsar_consumer_configuration_t *consumer_configuration) {
131+
return consumer_configuration->consumerConfiguration.getNegativeAckPrecisionBitCnt();
132+
}
133+
124134
void pulsar_configure_set_ack_grouping_time_ms(pulsar_consumer_configuration_t *consumer_configuration,
125135
long ackGroupingMillis) {
126136
consumer_configuration->consumerConfiguration.setAckGroupingTimeMs(ackGroupingMillis);

tests/BasicEndToEndTest.cc

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3249,6 +3249,60 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
32493249
testNegativeAcks(topicName, true);
32503250
}
32513251

3252+
int64_t getCurrentTimeMs() {
3253+
return std::chrono::duration_cast<std::chrono::milliseconds>(
3254+
std::chrono::system_clock::now().time_since_epoch())
3255+
.count();
3256+
}
3257+
3258+
void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCnt) {
3259+
constexpr int delayMs = 2000;
3260+
const int64_t timeDeviation = 1L << precisionBitCnt;
3261+
3262+
Client client(lookupUrl);
3263+
3264+
Consumer consumer;
3265+
ConsumerConfiguration conf;
3266+
conf.setNegativeAckRedeliveryDelayMs(delayMs);
3267+
conf.setNegativeAckPrecisionBitCnt(precisionBitCnt);
3268+
3269+
Result result = client.subscribe(topic, "sub1", conf, consumer);
3270+
ASSERT_EQ(ResultOk, result);
3271+
3272+
Producer producer;
3273+
ProducerConfiguration producerConf;
3274+
result = client.createProducer(topic, producerConf, producer);
3275+
ASSERT_EQ(ResultOk, result);
3276+
3277+
Message msg = MessageBuilder().setContent("test-0").build();
3278+
producer.sendAsync(msg, nullptr);
3279+
producer.flush();
3280+
3281+
// receive and trigger negative ack
3282+
Message received;
3283+
consumer.receive(received);
3284+
consumer.negativeAcknowledge(received);
3285+
3286+
int64_t expectedRedeliveryTime = getCurrentTimeMs() + delayMs;
3287+
3288+
Message redelivered;
3289+
consumer.receive(redelivered);
3290+
int64_t now = getCurrentTimeMs();
3291+
ASSERT_GE(now, expectedRedeliveryTime - timeDeviation);
3292+
ASSERT_EQ(redelivered.getDataAsString(), "test-0");
3293+
3294+
consumer.acknowledge(redelivered);
3295+
client.shutdown();
3296+
}
3297+
3298+
TEST(BasicEndToEndTest, testNegativeAckPrecisionBitCnt) {
3299+
for (int precisionBitCnt = 1; precisionBitCnt <= 12; precisionBitCnt++) {
3300+
std::string topic = "testNegativeAckPrecisionBitCnt-" + std::to_string(precisionBitCnt) + "-" +
3301+
std::to_string(time(nullptr));
3302+
testNegativeAckPrecisionBitCnt(topic, precisionBitCnt);
3303+
}
3304+
}
3305+
32523306
static long regexTestMessagesReceived = 0;
32533307

32543308
static void regexMessageListenerFunction(const Consumer &consumer, const Message &msg) {

tests/ConsumerConfigurationTest.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
5252
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
5353
ASSERT_EQ(conf.getTickDurationInMs(), 1000);
5454
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
55+
ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 8);
5556
ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
5657
ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
5758
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
@@ -114,6 +115,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
114115
conf.setNegativeAckRedeliveryDelayMs(10000);
115116
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
116117

118+
conf.setNegativeAckPrecisionBitCnt(4);
119+
ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 4);
120+
117121
conf.setAckGroupingTimeMs(200);
118122
ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
119123

0 commit comments

Comments
 (0)