Skip to content

Commit 537f13b

Browse files
committed
Fix lint errors by the clang-analyzer checks
1 parent b39bd89 commit 537f13b

17 files changed

+115
-124
lines changed

.clang-tidy

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,58 +16,59 @@
1616
# under the License.
1717

1818
---
19+
#bugprone-*,
20+
#google-*,
21+
#misc-*,
22+
#modernize-*,
23+
#performance-*,
24+
#portability-*,
25+
#readability-*,
26+
#-google-readability-braces-around-statements,
27+
#-google-readability-namespace-comments,
28+
#-google-runtime-references,
29+
#-misc-non-private-member-variables-in-classes,
30+
#-misc-const-correctness,
31+
#-misc-include-cleaner,
32+
#-modernize-return-braced-init-list,
33+
#-modernize-use-trailing-return-type,
34+
#-modernize-concat-nested-namespaces,
35+
#-modernize-use-nodiscard,
36+
#-modernize-avoid-c-arrays,
37+
#-modernize-type-traits,
38+
#-modernize-unary-static-assert,
39+
#-performance-move-const-arg,
40+
#-performance-avoid-endl,
41+
#-performance-enum-size,
42+
#-readability-braces-around-statements,
43+
#-readability-identifier-length,
44+
#-readability-magic-numbers,
45+
#-readability-named-parameter,
46+
#-readability-redundant-declaration,
47+
#-readability-avoid-return-with-void-value,
48+
#-readability-function-cognitive-complexity,
49+
#-bugprone-narrowing-conversions,
50+
#-bugprone-easily-swappable-parameters,
51+
#-bugprone-inc-dec-in-conditions,
52+
#-bugprone-implicit-widening-of-multiplication-result,
53+
#-bugprone-unchecked-optional-access,
54+
#-bugprone-unused-local-non-trivial-variable,
55+
#-bugprone-unused-return-value,
56+
#-bugprone-reserved-identifier,
57+
#-google-build-using-namespace,
58+
#-misc-unused-parameters,
59+
#-modernize-deprecated-headers,
60+
#-modernize-use-equals-default,
61+
#-modernize-use-using,
62+
#-readability-enum-initial-value,
1963
Checks: >
20-
# The following checks are based on https://github.com/googleapis/google-cloud-cpp/blob/main/.clang-tidy
2164
-*,
22-
bugprone-*,
23-
google-*,
24-
misc-*,
25-
modernize-*,
26-
performance-*,
27-
portability-*,
28-
readability-*,
29-
-google-readability-braces-around-statements,
30-
-google-readability-namespace-comments,
31-
-google-runtime-references,
32-
-misc-non-private-member-variables-in-classes,
33-
-misc-const-correctness,
34-
-misc-include-cleaner,
35-
-modernize-return-braced-init-list,
36-
-modernize-use-trailing-return-type,
37-
-modernize-concat-nested-namespaces,
38-
-modernize-use-nodiscard,
39-
-modernize-avoid-c-arrays,
40-
-modernize-type-traits,
41-
-modernize-unary-static-assert,
42-
-performance-move-const-arg,
43-
-performance-avoid-endl,
44-
-performance-enum-size,
45-
-readability-braces-around-statements,
46-
-readability-identifier-length,
47-
-readability-magic-numbers,
48-
-readability-named-parameter,
49-
-readability-redundant-declaration,
50-
-readability-avoid-return-with-void-value,
51-
-readability-function-cognitive-complexity,
52-
-bugprone-narrowing-conversions,
53-
-bugprone-easily-swappable-parameters,
54-
-bugprone-inc-dec-in-conditions,
55-
-bugprone-implicit-widening-of-multiplication-result,
56-
-bugprone-unchecked-optional-access,
57-
-bugprone-unused-local-non-trivial-variable,
58-
-bugprone-unused-return-value,
59-
# Here are new excluded checks that are not in the original file
60-
-bugprone-reserved-identifier,
61-
-google-build-using-namespace,
62-
-modernize-deprecated-headers,
63-
-modernize-use-using,
64-
-readability-enum-initial-value,
65+
clang-analyzer-*,
66+
-clang-analyzer-security.insecureAPI.rand,
6567
WarningsAsErrors: '*'
6668
HeaderFileExtensions:
6769
- ''
6870
- h
6971
ImplementationFileExtensions:
70-
# TODO: the *.c files are not being linted because they use a different style
7172
- cc
7273
HeaderFilterRegex: '.*'
7374
FormatStyle: none
@@ -86,5 +87,6 @@ CheckOptions:
8687
readability-identifier-naming.FunctionCase: 'camelBack'
8788
readability-identifier-naming.MemberCase: 'camelBack'
8889
readability-identifier-naming.PrivateMemberSuffix: '_'
90+
readability-identifier-naming.ProtectedMemberSuffix: '_'
8991
readability-identifier-naming.PublicMemberSuffix: ''
9092
SystemHeaders: false

build-support/run_clang_tidy.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
# under the License.
1919
#
2020

21+
set -e
22+
cd `dirname $0`/..
23+
2124
FILES=$(find $PWD/include $PWD/lib $PWD/tests $PWD/examples -name "*.h" -o -name "*.cc" \
2225
| grep -v "lib\/c\/" | grep -v "lib\/checksum\/" | grep -v "lib\/lz4\/" \
2326
| grep -v "include\/pulsar\/c\/" | grep -v "tests\/c\/")

lib/AckGroupingTracker.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,6 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
9191
callback(ResultOk);
9292
}
9393

94-
/**
95-
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.
96-
*/
97-
virtual void close() {}
98-
99-
/**
100-
* Flush all the pending grouped ACKs and send them to the broker.
101-
*/
102-
virtual void flush() {}
103-
10494
/**
10595
* Flush all the pending grouped ACKs (as flush() does), and clean all records about ACKed
10696
* messages, such as last cumulative ACKed message ID.

lib/AckGroupingTrackerEnabled.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId,
112112
}
113113
}
114114

115-
void AckGroupingTrackerEnabled::close() {
115+
AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
116116
isClosed_ = true;
117117
this->flush();
118118
std::lock_guard<std::mutex> lock(this->mutexTimer_);

lib/AckGroupingTrackerEnabled.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,14 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
5656
pendingIndividualCallbacks_.reserve(ackGroupingMaxSize);
5757
}
5858

59-
virtual ~AckGroupingTrackerEnabled() { this->close(); }
59+
~AckGroupingTrackerEnabled();
6060

6161
void start() override;
6262
bool isDuplicate(const MessageId& msgId) override;
6363
void addAcknowledge(const MessageId& msgId, ResultCallback callback) override;
6464
void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) override;
6565
void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) override;
66-
void close() override;
67-
void flush() override;
66+
void flush();
6867
void flushAndClean() override;
6968

7069
protected:

lib/ClientImpl.cc

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -735,23 +735,14 @@ void ClientImpl::shutdown() {
735735
auto producers = producers_.move();
736736
auto consumers = consumers_.move();
737737

738-
for (auto&& kv : producers) {
739-
ProducerImplBasePtr producer = kv.second.lock();
740-
if (producer) {
741-
producer->shutdown();
742-
}
743-
}
744-
745-
for (auto&& kv : consumers) {
746-
ConsumerImplBasePtr consumer = kv.second.lock();
747-
if (consumer) {
748-
consumer->shutdown();
749-
}
750-
}
751-
752-
if (producers.size() + consumers.size() > 0) {
753-
LOG_DEBUG(producers.size() << " producers and " << consumers.size()
754-
<< " consumers have been shutdown.");
738+
auto numProducers = producers.size();
739+
auto numConsumers = consumers.size();
740+
// Call the destructors out of the lock to avoid deadlocks
741+
producers.clear();
742+
consumers.clear();
743+
744+
if (numProducers + numConsumers > 0) {
745+
LOG_DEBUG(numProducers << " producers and " << numConsumers << " consumers have been shutdown.");
755746
}
756747
if (!pool_.close()) {
757748
// pool_ has already been closed. It means shutdown() has been called before.

lib/ConsumerConfigurationImpl.h

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,48 +21,45 @@
2121

2222
#include <pulsar/ConsumerConfiguration.h>
2323

24-
#include <chrono>
25-
2624
namespace pulsar {
2725
struct ConsumerConfigurationImpl {
28-
SchemaInfo schemaInfo;
2926
long unAckedMessagesTimeoutMs{0};
3027
long tickDurationInMs{1000};
31-
3228
long negativeAckRedeliveryDelayMs{60000};
3329
long ackGroupingTimeMs{100};
3430
long ackGroupingMaxSize{1000};
35-
ConsumerType consumerType{ConsumerExclusive};
36-
MessageListener messageListener;
37-
bool hasMessageListener{false};
31+
long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds
32+
long expireTimeOfIncompleteChunkedMessageMs{60000};
33+
SchemaInfo schemaInfo;
3834
ConsumerEventListenerPtr eventListener;
35+
CryptoKeyReaderPtr cryptoKeyReader;
36+
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
37+
int patternAutoDiscoveryPeriod{60};
38+
RegexSubscriptionMode regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly};
39+
int priorityLevel{0};
40+
bool hasMessageListener{false};
3941
bool hasConsumerEventListener{false};
42+
bool readCompacted{false};
43+
bool replicateSubscriptionStateEnabled{false};
44+
bool autoAckOldestChunkedMessageOnQueueFull{false};
45+
bool startMessageIdInclusive{false};
46+
bool batchIndexAckEnabled{false};
47+
bool ackReceiptEnabled{false};
48+
bool startPaused{false};
49+
50+
size_t maxPendingChunkedMessage{10};
51+
ConsumerType consumerType{ConsumerExclusive};
52+
MessageListener messageListener;
4053
int receiverQueueSize{1000};
4154
int maxTotalReceiverQueueSizeAcrossPartitions{50000};
4255
std::string consumerName;
43-
long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds
44-
CryptoKeyReaderPtr cryptoKeyReader;
4556
ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
46-
bool readCompacted{false};
47-
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
4857
BatchReceivePolicy batchReceivePolicy{};
4958
DeadLetterPolicy deadLetterPolicy;
50-
int patternAutoDiscoveryPeriod{60};
51-
RegexSubscriptionMode regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly};
52-
53-
bool replicateSubscriptionStateEnabled{false};
5459
std::map<std::string, std::string> properties;
5560
std::map<std::string, std::string> subscriptionProperties;
56-
int priorityLevel{0};
5761
KeySharedPolicy keySharedPolicy;
58-
size_t maxPendingChunkedMessage{10};
59-
bool autoAckOldestChunkedMessageOnQueueFull{false};
60-
bool startMessageIdInclusive{false};
61-
long expireTimeOfIncompleteChunkedMessageMs{60000};
62-
bool batchIndexAckEnabled{false};
6362
std::vector<ConsumerInterceptorPtr> interceptors;
64-
bool ackReceiptEnabled{false};
65-
bool startPaused{false};
6663
};
6764
} // namespace pulsar
6865
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */

lib/ConsumerImpl.cc

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,23 +159,23 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
159159
}
160160

161161
ConsumerImpl::~ConsumerImpl() {
162-
LOG_DEBUG(getName() << "~ConsumerImpl");
162+
LOG_DEBUG(consumerStr_ << "~ConsumerImpl");
163163
if (state_ == Ready) {
164164
// this could happen at least in this condition:
165165
// consumer seek, caused reconnection, if consumer close happened before connection ready,
166166
// then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in
167167
// broker.
168-
LOG_WARN(getName() << "Destroyed consumer which was not properly closed");
168+
LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed");
169169

170170
ClientConnectionPtr cnx = getCnx().lock();
171171
ClientImplPtr client = client_.lock();
172172
if (client && cnx) {
173173
int requestId = client->newRequestId();
174174
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
175175
cnx->removeConsumer(consumerId_);
176-
LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_);
176+
LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_);
177177
} else {
178-
LOG_WARN(getName() << "Client is destroyed and cannot send the CloseConsumer command");
178+
LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command");
179179
}
180180
}
181181
shutdown();
@@ -1330,9 +1330,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
13301330
incomingMessages_.close();
13311331

13321332
// Flush pending grouped ACK requests.
1333-
if (ackGroupingTrackerPtr_) {
1334-
ackGroupingTrackerPtr_->close();
1335-
}
1333+
ackGroupingTrackerPtr_.reset();
13361334
negativeAcksTracker_->close();
13371335

13381336
ClientConnectionPtr cnx = getCnx().lock();
@@ -1360,9 +1358,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
13601358
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
13611359

13621360
void ConsumerImpl::shutdown() {
1363-
if (ackGroupingTrackerPtr_) {
1364-
ackGroupingTrackerPtr_->close();
1365-
}
1361+
ackGroupingTrackerPtr_.reset();
13661362
incomingMessages_.clear();
13671363
possibleSendToDeadLetterTopicMessages_.clear();
13681364
resetCnx();

lib/ConsumerImpl.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class ConsumerImpl : public ConsumerImplBase {
120120
void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
121121
void closeAsync(ResultCallback callback) override;
122122
void start() override;
123-
void shutdown() override;
123+
void shutdown();
124124
bool isClosed() override;
125125
bool isOpen() override;
126126
Result pauseMessageListener() override;
@@ -277,9 +277,9 @@ class ConsumerImpl : public ConsumerImplBase {
277277
ChunkedMessageCtx(const ChunkedMessageCtx&) = delete;
278278
// Here we don't use =default to be compatible with GCC 4.8
279279
ChunkedMessageCtx(ChunkedMessageCtx&& rhs) noexcept
280-
: totalChunks_(rhs.totalChunks_),
281-
chunkedMsgBuffer_(std::move(rhs.chunkedMsgBuffer_)),
282-
chunkedMessageIds_(std::move(rhs.chunkedMessageIds_)) {}
280+
: totalChunks_(rhs.totalChunks_), chunkedMsgBuffer_(std::move(rhs.chunkedMsgBuffer_)) {
281+
std::swap(chunkedMessageIds_, rhs.chunkedMessageIds_);
282+
}
283283

284284
bool validateChunkId(int chunkId) const noexcept { return chunkId == numChunks(); }
285285

@@ -295,7 +295,11 @@ class ConsumerImpl : public ConsumerImplBase {
295295

296296
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
297297

298-
std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }
298+
std::vector<MessageId> moveChunkedMessageIds() noexcept {
299+
std::vector<MessageId> result;
300+
result.swap(chunkedMessageIds_);
301+
return result;
302+
}
299303

300304
long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
301305

lib/ConsumerImplBase.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ class ConsumerImplBase : public HandlerBase {
6363
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
6464
virtual void closeAsync(ResultCallback callback) = 0;
6565
virtual void start() = 0;
66-
virtual void shutdown() = 0;
6766
virtual bool isClosed() = 0;
6867
virtual bool isOpen() = 0;
6968
virtual Result pauseMessageListener() = 0;

0 commit comments

Comments
 (0)