Skip to content

Commit 33085eb

Browse files
Fix topic not shown correctly in the consumer string (apache#329)
### Motivation `ConsumerImpl::getName()` returns a string that is used in logs to represent the consumer. However, the topic part does not show correctly: ``` ConsumerImpl:283 | [0x6000001c88b8, consumer-1, 0] Created consumer on broker [127.0.0.1:60399 -> 127.0.0.1:6650] ``` It's because after apache#218, the `ConsumerImpl::topic_` field becomes `std::shared_ptr` rather than a `std::string` but it is still used to construct the `consumerStr_`. ### Modifications Construct the `consumerStr_` using the `topic` argument in the constructor and make `consumerStr_` const because it is never changed. Now the logs will be like: ``` ConsumerImpl:280 | [persistent://public/default/my-topic, consumer-1, 0] Created consumer on broker [127.0.0.1:60647 -> 127.0.0.1:6650] ```
1 parent 5c77648 commit 33085eb

File tree

4 files changed

+4
-7
lines changed

4 files changed

+4
-7
lines changed

lib/ConsumerImpl.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
8484
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
8585
consumerId_(client->newConsumerId()),
8686
consumerName_(config_.getConsumerName()),
87+
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
8788
messageListenerRunning_(true),
8889
negativeAcksTracker_(client, *this, conf),
8990
readCompacted_(conf.isReadCompacted()),
@@ -92,10 +93,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
9293
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
9394
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
9495
interceptors_(interceptors) {
95-
std::stringstream consumerStrStream;
96-
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
97-
consumerStr_ = consumerStrStream.str();
98-
9996
// Initialize un-ACKed messages OT tracker.
10097
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
10198
if (conf.getTickDurationInMs() > 0) {

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ class ConsumerImpl : public ConsumerImplBase {
215215
const int receiverQueueRefillThreshold_;
216216
uint64_t consumerId_;
217217
std::string consumerName_;
218-
std::string consumerStr_;
218+
const std::string consumerStr_;
219219
int32_t partitionIndex_ = -1;
220220
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
221221
std::atomic_bool messageListenerRunning_;

lib/MessageCrypto.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace pulsar {
2828

2929
DECLARE_LOG_OBJECT()
3030

31-
MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded)
31+
MessageCrypto::MessageCrypto(const std::string& logCtx, bool keyGenNeeded)
3232
: dataKeyLen_(32),
3333
dataKey_(new unsigned char[dataKeyLen_]),
3434
tagLen_(16),

lib/MessageCrypto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class MessageCrypto {
4949
typedef std::map<std::string, std::string> StringMap;
5050
typedef std::map<std::string, std::pair<std::string, boost::posix_time::ptime>> DataKeyCacheMap;
5151

52-
MessageCrypto(std::string& logCtx, bool keyGenNeeded);
52+
MessageCrypto(const std::string& logCtx, bool keyGenNeeded);
5353
~MessageCrypto();
5454

5555
/*

0 commit comments

Comments
 (0)