Skip to content

Commit 80ca91a

Browse files
committed
fix clang-tidy check
1 parent 27c1a99 commit 80ca91a

File tree

10 files changed

+49
-47
lines changed

10 files changed

+49
-47
lines changed

lib/ClientConnection.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,7 +1189,7 @@ void ClientConnection::sendPendingCommands() {
11891189
}
11901190
}
11911191

1192-
Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
1192+
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) {
11931193
Lock lock(mutex_);
11941194

11951195
if (isClosed()) {
@@ -1389,12 +1389,12 @@ Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
13891389
return connectPromise_.getFuture();
13901390
}
13911391

1392-
void ClientConnection::registerProducer(int producerId, ProducerImplPtr producer) {
1392+
void ClientConnection::registerProducer(int producerId, const ProducerImplPtr& producer) {
13931393
Lock lock(mutex_);
13941394
producers_.insert(std::make_pair(producerId, producer));
13951395
}
13961396

1397-
void ClientConnection::registerConsumer(int consumerId, ConsumerImplPtr consumer) {
1397+
void ClientConnection::registerConsumer(int consumerId, const ConsumerImplPtr& consumer) {
13981398
Lock lock(mutex_);
13991399
consumers_.insert(std::make_pair(consumerId, consumer));
14001400
}

lib/ClientConnection.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
177177
void sendCommandInternal(const SharedBuffer& cmd);
178178
void sendMessage(const std::shared_ptr<SendArguments>& args);
179179

180-
void registerProducer(int producerId, ProducerImplPtr producer);
181-
void registerConsumer(int consumerId, ConsumerImplPtr consumer);
180+
void registerProducer(int producerId, const ProducerImplPtr& producer);
181+
void registerConsumer(int consumerId, const ConsumerImplPtr& consumer);
182182

183183
void removeProducer(int producerId);
184184
void removeConsumer(int consumerId);
@@ -187,7 +187,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
187187
* Send a request with a specific Id over the connection. The future will be
188188
* triggered when the response for this request is received
189189
*/
190-
Future<Result, ResponseData> sendRequestWithId(SharedBuffer cmd, int requestId);
190+
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId);
191191

192192
const std::string& brokerAddress() const;
193193

lib/ClientImpl.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI)
148148
return it->second;
149149
}
150150

151-
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
151+
void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
152152
const CreateProducerCallback& callback, bool autoDownloadSchema) {
153153
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
154154
throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
@@ -239,7 +239,7 @@ void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeak
239239
}
240240

241241
void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
242-
const ReaderConfiguration& conf, ReaderCallback callback) {
242+
const ReaderConfiguration& conf, const ReaderCallback& callback) {
243243
TopicNamePtr topicName;
244244
{
245245
Lock lock(mutex_);
@@ -326,7 +326,8 @@ void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResul
326326
}
327327

328328
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
329-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
329+
const ConsumerConfiguration& conf,
330+
const SubscribeCallback& callback) {
330331
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
331332

332333
Lock lock(mutex_);
@@ -378,7 +379,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace
378379
CommandGetTopicsOfNamespace_Mode mode,
379380
const std::string& subscriptionName,
380381
const ConsumerConfiguration& conf,
381-
SubscribeCallback callback) {
382+
const SubscribeCallback& callback) {
382383
if (result == ResultOk) {
383384
ConsumerImplBasePtr consumer;
384385

@@ -404,7 +405,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace
404405
}
405406

406407
void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
407-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
408+
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
408409
TopicNamePtr topicNamePtr;
409410

410411
Lock lock(mutex_);
@@ -440,7 +441,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
440441
}
441442

442443
void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
443-
const ConsumerConfiguration& conf, SubscribeCallback callback) {
444+
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
444445
TopicNamePtr topicName;
445446
{
446447
Lock lock(mutex_);
@@ -468,7 +469,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub
468469

469470
void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata,
470471
const TopicNamePtr& topicName, const std::string& subscriptionName,
471-
ConsumerConfiguration conf, SubscribeCallback callback) {
472+
ConsumerConfiguration conf, const SubscribeCallback& callback) {
472473
if (result == ResultOk) {
473474
// generate random name if not supplied by the customer.
474475
if (conf.getConsumerName().empty()) {
@@ -625,7 +626,7 @@ void ClientImpl::handleGetPartitions(Result result, const LookupDataResultPtr& p
625626
callback(ResultOk, partitions);
626627
}
627628

628-
void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback) {
629+
void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback) {
629630
TopicNamePtr topicName;
630631
{
631632
Lock lock(mutex_);
@@ -644,7 +645,7 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
644645
std::placeholders::_2, topicName, callback));
645646
}
646647

647-
void ClientImpl::closeAsync(CloseCallback callback) {
648+
void ClientImpl::closeAsync(const CloseCallback& callback) {
648649
if (state_ != Open) {
649650
if (callback) {
650651
callback(ResultAlreadyClosed);

lib/ClientImpl.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,25 +76,25 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7676
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
7777
* that exists for the topic.
7878
*/
79-
void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
79+
void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
8080
const CreateProducerCallback& callback, bool autoDownloadSchema = false);
8181

8282
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
83-
const ConsumerConfiguration& conf, SubscribeCallback callback);
83+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
8484

8585
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
86-
const ConsumerConfiguration& conf, SubscribeCallback callback);
86+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
8787

8888
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
89-
const ConsumerConfiguration& conf, SubscribeCallback callback);
89+
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
9090

9191
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
92-
const ReaderConfiguration& conf, ReaderCallback callback);
92+
const ReaderConfiguration& conf, const ReaderCallback& callback);
9393

9494
void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
9595
const TableViewCallback& callback);
9696

97-
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
97+
void getPartitionsForTopicAsync(const std::string& topic, const GetPartitionsCallback& callback);
9898

9999
// Use virtual method to test
100100
virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
@@ -103,7 +103,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
103103
GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
104104
size_t key);
105105

106-
void closeAsync(CloseCallback callback);
106+
void closeAsync(const CloseCallback& callback);
107107
void shutdown();
108108

109109
MemoryLimitController& getMemoryLimitController();
@@ -143,7 +143,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
143143

144144
void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata,
145145
const TopicNamePtr& topicName, const std::string& consumerName,
146-
ConsumerConfiguration conf, SubscribeCallback callback);
146+
ConsumerConfiguration conf, const SubscribeCallback& callback);
147147

148148
void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata,
149149
const TopicNamePtr& topicName, const MessageId& startMessageId,
@@ -165,7 +165,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
165165
const std::string& regexPattern,
166166
CommandGetTopicsOfNamespace_Mode mode,
167167
const std::string& consumerName, const ConsumerConfiguration& conf,
168-
SubscribeCallback callback);
168+
const SubscribeCallback& callback);
169169

170170
const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
171171
const std::string& logicalAddress);

lib/ConsumerImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,7 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
17001700

17011701
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
17021702

1703-
void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
1703+
void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
17041704
const ResultCallback& callback) {
17051705
ClientConnectionPtr cnx = getCnx().lock();
17061706
if (!cnx) {
@@ -1729,7 +1729,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Se
17291729

17301730
std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
17311731

1732-
cnx->sendRequestWithId(std::move(seek), requestId)
1732+
cnx->sendRequestWithId(seek, requestId)
17331733
.addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
17341734
const ResponseData& responseData) {
17351735
auto self = weakSelf.lock();

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class ConsumerImpl : public ConsumerImplBase {
214214
return os;
215215
}
216216

217-
void seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
217+
void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
218218
const ResultCallback& callback);
219219
void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb);
220220

lib/MultiTopicsConsumerImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& originalCallback)
510510
batchReceiveTimer_->cancel();
511511
}
512512

513-
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
513+
void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Message& msg) {
514514
if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
515515
return;
516516
}
@@ -1038,7 +1038,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
10381038
}
10391039
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
10401040
auto weakSelf = weak_from_this();
1041-
config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
1041+
config.setMessageListener([this, weakSelf](const Consumer& consumer, const Message& msg) {
10421042
auto self = weakSelf.lock();
10431043
if (self) {
10441044
messageReceived(consumer, msg);

lib/MultiTopicsConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
137137
/* methods */
138138
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
139139
unsigned int partitionIndex);
140-
void notifyResult(CloseCallback closeCallback);
141-
void messageReceived(Consumer consumer, const Message& msg);
140+
void notifyResult(const CloseCallback& closeCallback);
141+
void messageReceived(const Consumer& consumer, const Message& msg);
142142
void messageProcessed(Message& msg);
143143
void internalListener(const Consumer& consumer);
144144
void receiveMessages();

tests/BasicEndToEndTest.cc

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ static void messageListenerFunction(Consumer consumer, const Message &msg) {
7878
consumer.acknowledge(msg);
7979
}
8080

81-
static void messageListenerFunctionWithoutAck(Consumer consumer, const Message &msg, Latch &latch,
81+
static void messageListenerFunctionWithoutAck(const Consumer &consumer, const Message &msg, Latch &latch,
8282
const std::string &content) {
8383
globalCount++;
8484
ASSERT_EQ(content, msg.getDataAsString());
8585
latch.countdown();
8686
}
8787

88-
static void sendCallBack(Result r, const MessageId &msgId, std::string prefix, int *count) {
88+
static void sendCallBack(Result r, const MessageId &msgId, const std::string &prefix, int *count) {
8989
static std::mutex sendMutex_;
9090
sendMutex_.lock();
9191
ASSERT_EQ(r, ResultOk);
@@ -111,8 +111,8 @@ static void receiveCallBack(Result r, const Message &msg, std::string &messageCo
111111
receiveMutex_.unlock();
112112
}
113113

114-
static void sendCallBackWithDelay(Result r, const MessageId &msgId, std::string prefix, double percentage,
115-
uint64_t delayInMicros, int *count) {
114+
static void sendCallBackWithDelay(Result r, const MessageId &msgId, const std::string &prefix,
115+
double percentage, uint64_t delayInMicros, int *count) {
116116
if ((rand() % 100) <= percentage) {
117117
std::this_thread::sleep_for(std::chrono::microseconds(delayInMicros));
118118
}
@@ -194,7 +194,7 @@ TEST(BasicEndToEndTest, testBatchMessages) {
194194
ASSERT_EQ(i, numOfMessages);
195195
}
196196

197-
void resendMessage(Result r, const MessageId msgId, Producer producer) {
197+
void resendMessage(Result r, const MessageId &msgId, Producer &producer) {
198198
std::unique_lock<std::mutex> lock(mutex_);
199199
if (r != ResultOk) {
200200
LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount);
@@ -508,7 +508,7 @@ TEST(BasicEndToEndTest, testInvalidUrlPassed) {
508508
EXPECT_THROW({ Client{"Dream of the day when this will be a valid URL"}; }, std::invalid_argument);
509509
}
510510

511-
void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) {
511+
void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, const std::string &topicName) {
512512
Client client(lookupUrl);
513513

514514
// call admin api to make it partitioned
@@ -1172,7 +1172,7 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
11721172
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
11731173
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
11741174

1175-
auto msgId = receivedMsg.getMessageId();
1175+
const auto &msgId = receivedMsg.getMessageId();
11761176
if (msgId.batchIndex() < 0) {
11771177
numAcks++;
11781178
} else if (msgId.batchIndex() + 1 == msgId.batchSize()) {
@@ -1803,7 +1803,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
18031803

18041804
static long messagesReceived = 0;
18051805

1806-
static void unackMessageListenerFunction(Consumer consumer, const Message &msg) { messagesReceived++; }
1806+
static void unackMessageListenerFunction(const Consumer &consumer, const Message &msg) { messagesReceived++; }
18071807

18081808
TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
18091809
Client client(lookupUrl);
@@ -3251,7 +3251,7 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
32513251

32523252
static long regexTestMessagesReceived = 0;
32533253

3254-
static void regexMessageListenerFunction(Consumer consumer, const Message &msg) {
3254+
static void regexMessageListenerFunction(const Consumer &consumer, const Message &msg) {
32553255
regexTestMessagesReceived++;
32563256
}
32573257

@@ -3428,7 +3428,7 @@ TEST(BasicEndToEndTest, testDelayedMessages) {
34283428
ASSERT_EQ("msg-2", msgReceived.getDataAsString());
34293429

34303430
auto result1 = client.close();
3431-
std::cout << "closed with " << result1 << std::endl;
3431+
std::cout << "closed with " << result1 << '\n';
34323432
ASSERT_EQ(ResultOk, result1);
34333433
}
34343434

@@ -3940,7 +3940,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
39403940

39413941
class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
39423942
public:
3943-
UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr client, ConsumerImplBase &consumer)
3943+
UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr &client, ConsumerImplBase &consumer)
39443944
: UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer) {}
39453945
const long getUnAckedMessagesTimeoutMs() { return this->timeoutMs_; }
39463946
const long getTickDurationInMs() { return this->tickDurationInMs_; }
@@ -4213,8 +4213,8 @@ void testBatchReceive(bool multiConsumer) {
42134213
ASSERT_EQ(ResultOk, producer.flush());
42144214
for (int i = 0; i < numOfMessages / batchReceiveMaxNumMessages; i++) {
42154215
Latch latch(1);
4216-
BatchReceiveCallback batchReceiveCallback = [&latch, batchReceiveMaxNumMessages](Result result,
4217-
Messages messages) {
4216+
BatchReceiveCallback batchReceiveCallback = [&latch, batchReceiveMaxNumMessages](
4217+
Result result, const Messages &messages) {
42184218
ASSERT_EQ(result, ResultOk);
42194219
ASSERT_EQ(messages.size(), batchReceiveMaxNumMessages);
42204220
latch.countdown();
@@ -4276,7 +4276,8 @@ void testBatchReceiveTimeout(bool multiConsumer) {
42764276
}
42774277

42784278
Latch latch(1);
4279-
BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
4279+
BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result,
4280+
const Messages &messages) {
42804281
ASSERT_EQ(result, ResultOk);
42814282
ASSERT_EQ(messages.size(), numOfMessages);
42824283
latch.countdown();
@@ -4321,7 +4322,7 @@ void testBatchReceiveClose(bool multiConsumer) {
43214322
ASSERT_EQ(ResultOk, result);
43224323

43234324
Latch latch(1);
4324-
BatchReceiveCallback batchReceiveCallback = [&latch](Result result, Messages messages) {
4325+
BatchReceiveCallback batchReceiveCallback = [&latch](Result result, const Messages &messages) {
43254326
ASSERT_EQ(result, ResultAlreadyClosed);
43264327
latch.countdown();
43274328
};

tests/ClientTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ TEST(ClientTest, testConnectionClose) {
405405

406406
const auto topic = "client-test-connection-close";
407407
for (auto &client : clients) {
408-
auto testClose = [&client](ClientConnectionWeakPtr weakCnx) {
408+
auto testClose = [&client](const ClientConnectionWeakPtr &weakCnx) {
409409
auto cnx = weakCnx.lock();
410410
ASSERT_TRUE(cnx);
411411

0 commit comments

Comments
 (0)