Skip to content

Commit f9cca17

Browse files
BewareMyPowerRobertIndie
authored andcommitted
[fix] Fix acknowledge MessageId list does not work when ackGroupingTimeMs is 0 (#128)
1 parent 411843e commit f9cca17

File tree

6 files changed

+188
-125
lines changed

6 files changed

+188
-125
lines changed

lib/AckGroupingTracker.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
4646
return true;
4747
}
4848

49+
static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
50+
bool first = true;
51+
for (auto&& msgId : msgIds) {
52+
if (first) {
53+
first = false;
54+
} else {
55+
os << ", ";
56+
}
57+
os << "[" << msgId << "]";
58+
}
59+
return os;
60+
}
61+
4962
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
5063
const std::set<MessageId>& msgIds) {
5164
auto cnx = connWeakPtr.lock();
@@ -54,8 +67,15 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
5467
return false;
5568
}
5669

57-
for (const auto& msgId : msgIds) {
58-
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
70+
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
71+
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
72+
cnx->sendCommand(cmd);
73+
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
74+
} else {
75+
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
76+
for (const auto& msgId : msgIds) {
77+
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
78+
}
5979
}
6080
return true;
6181
}

lib/AckGroupingTrackerDisabled.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
3636
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
3737
}
3838

39+
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
40+
std::set<MessageId> msgIdSet;
41+
for (auto&& msgId : msgIds) {
42+
msgIdSet.emplace(msgId);
43+
}
44+
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
45+
}
46+
3947
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
4048
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
4149
}

lib/AckGroupingTrackerDisabled.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker {
4444
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
4545

4646
void addAcknowledge(const MessageId& msgId) override;
47+
void addAcknowledgeList(const MessageIdList& msgIds) override;
4748
void addAcknowledgeCumulative(const MessageId& msgId) override;
4849

4950
private:

lib/AckGroupingTrackerEnabled.cc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,7 @@ void AckGroupingTrackerEnabled::flush() {
132132
// Send ACK for individual ACK requests.
133133
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
134134
if (!this->pendingIndividualAcks_.empty()) {
135-
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
136-
auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_);
137-
cnx->sendCommand(cmd);
138-
} else {
139-
// Broker does not support multi-message ACK, use multiple individual ACK instead.
140-
this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_);
141-
}
135+
this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
142136
this->pendingIndividualAcks_.clear();
143137
}
144138
}

tests/AcknowledgeTest.cc

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <gtest/gtest.h>
20+
#include <pulsar/Client.h>
21+
22+
#include "HttpHelper.h"
23+
#include "PulsarFriend.h"
24+
#include "lib/LogUtils.h"
25+
26+
DECLARE_LOG_OBJECT()
27+
28+
using namespace pulsar;
29+
30+
static std::string lookupUrl = "pulsar://localhost:6650";
31+
static std::string adminUrl = "http://localhost:8080/";
32+
33+
extern std::string unique_str();
34+
35+
class AcknowledgeTest : public testing::TestWithParam<int> {};
36+
37+
TEST_P(AcknowledgeTest, testAckMsgList) {
38+
Client client(lookupUrl);
39+
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
40+
41+
constexpr auto numMsg = 100;
42+
std::string uniqueChunk = unique_str();
43+
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
44+
const std::string subName = "sub-ack-list";
45+
46+
Producer producer;
47+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
48+
49+
ConsumerConfiguration consumerConfig;
50+
consumerConfig.setAckGroupingMaxSize(numMsg);
51+
consumerConfig.setAckGroupingTimeMs(GetParam());
52+
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
53+
Consumer consumer;
54+
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
55+
56+
// Sending and receiving messages.
57+
for (auto count = 0; count < numMsg; ++count) {
58+
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
59+
ASSERT_EQ(ResultOk, producer.send(msg));
60+
}
61+
62+
std::vector<MessageId> recvMsgId;
63+
for (auto count = 0; count < numMsg; ++count) {
64+
Message msg;
65+
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
66+
recvMsgId.emplace_back(msg.getMessageId());
67+
}
68+
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
69+
70+
// try redeliver unack messages.
71+
consumer.redeliverUnacknowledgedMessages();
72+
73+
auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
74+
auto ackMap = consumerStats->getAckedMsgMap();
75+
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
76+
ASSERT_EQ(totalAck, numMsg);
77+
78+
Message msg;
79+
auto ret = consumer.receive(msg, 1000);
80+
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
81+
82+
producer.close();
83+
consumer.close();
84+
client.close();
85+
}
86+
87+
TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
88+
Client client(lookupUrl);
89+
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
90+
91+
std::string uniqueChunk = unique_str();
92+
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
93+
94+
// call admin api to make it partitioned
95+
std::string url =
96+
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
97+
int res = makePutRequest(url, "5");
98+
LOG_INFO("res = " << res);
99+
ASSERT_FALSE(res != 204 && res != 409);
100+
101+
constexpr auto numMsg = 100;
102+
const std::string subName = "sub-ack-list";
103+
104+
Producer producer;
105+
ProducerConfiguration producerConfig;
106+
// Turn off batch to ensure even distribution
107+
producerConfig.setBatchingEnabled(false);
108+
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
109+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
110+
111+
ConsumerConfiguration consumerConfig;
112+
// set ack grouping max size is 10
113+
consumerConfig.setAckGroupingMaxSize(10);
114+
consumerConfig.setAckGroupingTimeMs(GetParam());
115+
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
116+
Consumer consumer;
117+
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
118+
119+
// Sending and receiving messages.
120+
for (auto count = 0; count < numMsg; ++count) {
121+
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
122+
ASSERT_EQ(ResultOk, producer.send(msg));
123+
}
124+
125+
std::vector<MessageId> recvMsgId;
126+
for (auto count = 0; count < numMsg; ++count) {
127+
Message msg;
128+
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
129+
recvMsgId.emplace_back(msg.getMessageId());
130+
}
131+
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
132+
133+
// try redeliver unack messages.
134+
consumer.redeliverUnacknowledgedMessages();
135+
136+
// assert stats
137+
unsigned long totalAck = 0;
138+
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
139+
for (auto consumerStats : consumerStatsList) {
140+
auto ackMap = consumerStats->getAckedMsgMap();
141+
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
142+
}
143+
ASSERT_EQ(totalAck, numMsg);
144+
145+
Message msg;
146+
auto ret = consumer.receive(msg, 1000);
147+
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
148+
149+
producer.close();
150+
consumer.close();
151+
client.close();
152+
}
153+
154+
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));

tests/BasicEndToEndTest.cc

Lines changed: 2 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <pulsar/Client.h>
2121

2222
#include <algorithm>
23+
#include <atomic>
2324
#include <chrono>
2425
#include <cstring>
2526
#include <functional>
@@ -58,7 +59,7 @@ static int globalCount = 0;
5859
static long globalResendMessageCount = 0;
5960
std::string lookupUrl = "pulsar://localhost:6650";
6061
static std::string adminUrl = "http://localhost:8080/";
61-
static int uniqueCounter = 0;
62+
static std::atomic_int uniqueCounter{0};
6263

6364
std::string unique_str() {
6465
long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -4276,118 +4277,3 @@ void testBatchReceiveClose(bool multiConsumer) {
42764277
TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
42774278

42784279
TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
4279-
4280-
TEST(BasicEndToEndTest, testAckMsgList) {
4281-
Client client(lookupUrl);
4282-
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
4283-
4284-
constexpr auto numMsg = 100;
4285-
std::string uniqueChunk = unique_str();
4286-
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
4287-
const std::string subName = "sub-ack-list";
4288-
4289-
Producer producer;
4290-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
4291-
4292-
ConsumerConfiguration consumerConfig;
4293-
consumerConfig.setAckGroupingMaxSize(numMsg);
4294-
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
4295-
Consumer consumer;
4296-
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
4297-
4298-
// Sending and receiving messages.
4299-
for (auto count = 0; count < numMsg; ++count) {
4300-
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
4301-
ASSERT_EQ(ResultOk, producer.send(msg));
4302-
}
4303-
4304-
std::vector<MessageId> recvMsgId;
4305-
for (auto count = 0; count < numMsg; ++count) {
4306-
Message msg;
4307-
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
4308-
recvMsgId.emplace_back(msg.getMessageId());
4309-
}
4310-
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
4311-
4312-
// try redeliver unack messages.
4313-
consumer.redeliverUnacknowledgedMessages();
4314-
4315-
auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
4316-
auto ackMap = consumerStats->getAckedMsgMap();
4317-
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
4318-
ASSERT_EQ(totalAck, numMsg);
4319-
4320-
Message msg;
4321-
auto ret = consumer.receive(msg, 1000);
4322-
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
4323-
4324-
producer.close();
4325-
consumer.close();
4326-
client.close();
4327-
}
4328-
4329-
TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) {
4330-
Client client(lookupUrl);
4331-
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
4332-
4333-
std::string uniqueChunk = unique_str();
4334-
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
4335-
4336-
// call admin api to make it partitioned
4337-
std::string url =
4338-
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
4339-
int res = makePutRequest(url, "5");
4340-
LOG_INFO("res = " << res);
4341-
ASSERT_FALSE(res != 204 && res != 409);
4342-
4343-
constexpr auto numMsg = 100;
4344-
const std::string subName = "sub-ack-list";
4345-
4346-
Producer producer;
4347-
ProducerConfiguration producerConfig;
4348-
// Turn off batch to ensure even distribution
4349-
producerConfig.setBatchingEnabled(false);
4350-
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
4351-
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
4352-
4353-
ConsumerConfiguration consumerConfig;
4354-
// set ack grouping max size is 10
4355-
consumerConfig.setAckGroupingMaxSize(10);
4356-
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
4357-
Consumer consumer;
4358-
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
4359-
4360-
// Sending and receiving messages.
4361-
for (auto count = 0; count < numMsg; ++count) {
4362-
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
4363-
ASSERT_EQ(ResultOk, producer.send(msg));
4364-
}
4365-
4366-
std::vector<MessageId> recvMsgId;
4367-
for (auto count = 0; count < numMsg; ++count) {
4368-
Message msg;
4369-
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
4370-
recvMsgId.emplace_back(msg.getMessageId());
4371-
}
4372-
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
4373-
4374-
// try redeliver unack messages.
4375-
consumer.redeliverUnacknowledgedMessages();
4376-
4377-
// assert stats
4378-
unsigned long totalAck = 0;
4379-
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
4380-
for (auto consumerStats : consumerStatsList) {
4381-
auto ackMap = consumerStats->getAckedMsgMap();
4382-
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
4383-
}
4384-
ASSERT_EQ(totalAck, numMsg);
4385-
4386-
Message msg;
4387-
auto ret = consumer.receive(msg, 1000);
4388-
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
4389-
4390-
producer.close();
4391-
consumer.close();
4392-
client.close();
4393-
}

0 commit comments

Comments
 (0)