Skip to content

Commit fa413e1

Browse files
Fix duplicated subscribed topics not deduplicated (#501)
1 parent 5451797 commit fa413e1

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

lib/ClientImpl.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
#include <pulsar/ClientConfiguration.h>
2222
#include <pulsar/Version.h>
2323

24+
#include <algorithm>
2425
#include <chrono>
26+
#include <iterator>
2527
#include <random>
2628
#include <sstream>
2729

@@ -404,10 +406,17 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace
404406
}
405407
}
406408

407-
void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
408-
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
409+
void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
410+
const std::string& subscriptionName, const ConsumerConfiguration& conf,
411+
const SubscribeCallback& callback) {
409412
TopicNamePtr topicNamePtr;
410413

414+
// Remove duplicates from the list of topics
415+
auto topics = originalTopics;
416+
std::sort(topics.begin(), topics.end());
417+
auto it = std::unique(topics.begin(), topics.end());
418+
auto newSize = std::distance(topics.begin(), it);
419+
topics.resize(newSize);
411420
Lock lock(mutex_);
412421
if (state_ != Open) {
413422
lock.unlock();

tests/ConsumerTest.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,4 +1499,24 @@ TEST(ConsumerTest, testReconnectWhenFirstConnectTimedOut) {
14991499
client.close();
15001500
}
15011501

1502+
TEST(ConsumerTest, testDuplicatedTopics) {
1503+
Client client{lookupUrl};
1504+
auto topicPrefix = "consumer-test-duplicated-topics" + std::to_string(time(nullptr));
1505+
std::array<std::string, 3> uniqueTopics{topicPrefix + "0", topicPrefix + "1", topicPrefix + "2"};
1506+
std::vector<std::string> topics{uniqueTopics[1], uniqueTopics[0], uniqueTopics[2], uniqueTopics[1],
1507+
uniqueTopics[2]};
1508+
1509+
Consumer consumer;
1510+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
1511+
1512+
for (size_t i = 0; i < uniqueTopics.size(); i++) {
1513+
Producer producer;
1514+
ASSERT_EQ(ResultOk, client.createProducer(topics[i], producer));
1515+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
1516+
Message msg;
1517+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
1518+
ASSERT_EQ("msg-" + std::to_string(i), msg.getDataAsString());
1519+
}
1520+
}
1521+
15021522
} // namespace pulsar

0 commit comments

Comments
 (0)