Skip to content

Commit 7cefe0e

Browse files
Fix topic name is shown as a pointer rather than string (apache#331)
### Motivation This is an additional fix to apache#329 because I still observed logs like: ``` Closing consumer for topic 0x6000028e0648 Closing producer for topic 0x600001210b88 ``` It's because `HandlerBase::topic_` field is protected and could be accessed directly from the derived classes. ### Motivation In `HandlerBase`, make `topic_` private and add two methods `topic()` and `getTopicPtr()` to get the reference to the string and the shared pointer. `getTopicPtr()` should only be called when being passed to `MessageImpl::setTopicName`.
1 parent a0f2d32 commit 7cefe0e

File tree

7 files changed

+29
-25
lines changed

7 files changed

+29
-25
lines changed

lib/BatchMessageContainerBase.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
namespace pulsar {
2828

2929
BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer)
30-
: topicName_(producer.topic_),
30+
: topicName_(producer.topic()),
3131
producerConfig_(producer.conf_),
3232
producerName_(producer.producerName_),
3333
producerId_(producer.producerId_),

lib/BatchMessageContainerBase.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class BatchMessageContainerBase : public boost::noncopyable {
9090

9191
protected:
9292
// references to ProducerImpl's fields
93-
const std::shared_ptr<std::string> topicName_;
93+
const std::string topicName_;
9494
const ProducerConfiguration& producerConfig_;
9595
const std::string& producerName_;
9696
const uint64_t& producerId_;

lib/ConsumerImpl.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
174174

175175
const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
176176

177-
const std::string& ConsumerImpl::getTopic() const { return *topic_; }
177+
const std::string& ConsumerImpl::getTopic() const { return topic(); }
178178

179179
void ConsumerImpl::start() {
180180
HandlerBase::start();
@@ -194,7 +194,7 @@ void ConsumerImpl::start() {
194194

195195
// Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
196196
// constructor completed.
197-
if (TopicName::get(*topic_)->isPersistent()) {
197+
if (TopicName::get(topic())->isPersistent()) {
198198
if (config_.getAckGroupingTimeMs() > 0) {
199199
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
200200
connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(),
@@ -249,7 +249,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
249249
ClientImplPtr client = client_.lock();
250250
uint64_t requestId = client->newRequestId();
251251
SharedBuffer cmd = Commands::newSubscribe(
252-
*topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
252+
topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
253253
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
254254
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
255255
config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -552,7 +552,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
552552

553553
Message m(messageId, brokerEntryMetadata, metadata, payload);
554554
m.impl_->cnx_ = cnx.get();
555-
m.impl_->setTopicName(topic_);
555+
m.impl_->setTopicName(getTopicPtr());
556556
m.impl_->setRedeliveryCount(msg.redelivery_count());
557557

558558
if (metadata.has_schema_version()) {
@@ -1243,7 +1243,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12431243
return;
12441244
}
12451245

1246-
LOG_INFO(getName() << "Closing consumer for topic " << topic_);
1246+
LOG_INFO(getName() << "Closing consumer for topic " << topic());
12471247
state_ = Closing;
12481248
incomingMessages_.close();
12491249

@@ -1764,7 +1764,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
17641764
return;
17651765
}
17661766
if (result != ResultOk) {
1767-
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
1767+
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
17681768
<< self->consumerName_ << "} Failed to acknowledge the message {"
17691769
<< originMessageId
17701770
<< "} of the original topic but send to the DLQ successfully : "
@@ -1777,7 +1777,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
17771777
}
17781778
});
17791779
} else {
1780-
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
1780+
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
17811781
<< self->consumerName_ << "} Failed to send DLQ message to {"
17821782
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
17831783
<< "{" << originMessageId << "} : " << res);

lib/HandlerBase.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ DECLARE_LOG_OBJECT()
3030
namespace pulsar {
3131

3232
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
33-
: client_(client),
34-
topic_(std::make_shared<std::string>(topic)),
33+
: topic_(std::make_shared<std::string>(topic)),
34+
client_(client),
3535
executor_(client->getIOExecutorProvider()->get()),
3636
mutex_(),
3737
creationTimestamp_(TimeUtils::now()),
@@ -88,7 +88,7 @@ void HandlerBase::grabCnx() {
8888
return;
8989
}
9090
auto self = shared_from_this();
91-
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
91+
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
9292
if (result == ResultOk) {
9393
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
9494
connectionOpened(cnx).addListener([this, self](Result result, bool) {

lib/HandlerBase.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,18 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
8787

8888
virtual const std::string& getName() const = 0;
8989

90+
const std::string& topic() const { return *topic_; }
91+
const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }
92+
9093
private:
94+
const std::shared_ptr<std::string> topic_;
95+
9196
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
9297

9398
void handleTimeout(const boost::system::error_code& ec);
9499

95100
protected:
96101
ClientImplWeakPtr client_;
97-
const std::shared_ptr<std::string> topic_;
98102
ExecutorServicePtr executor_;
99103
mutable std::mutex mutex_;
100104
std::mutex pendingReceiveMutex_;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
7272
interceptors_(interceptors) {
7373
std::stringstream consumerStrStream;
7474
consumerStrStream << "[Muti Topics Consumer: "
75-
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
75+
<< "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]";
7676
consumerStr_ = consumerStrStream.str();
7777

7878
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
@@ -312,7 +312,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
312312
}
313313

314314
void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
315-
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
315+
LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");
316316

317317
auto callback = [this, originalCallback](Result result) {
318318
if (result == ResultOk) {
@@ -483,7 +483,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
483483
*numberTopicPartitions_ = 0;
484484
if (consumers.empty()) {
485485
LOG_DEBUG("TopicsConsumer have no consumers to close "
486-
<< " topic" << topic_ << " subscription - " << subscriptionName_);
486+
<< " topic" << topic() << " subscription - " << subscriptionName_);
487487
callback(ResultAlreadyClosed);
488488
return;
489489
}
@@ -518,7 +518,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
518518
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
519519
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
520520
<< " message:" << msg.getDataAsString());
521-
msg.impl_->setTopicName(consumer.impl_->topic_);
521+
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
522522

523523
Lock lock(pendingReceiveMutex_);
524524
if (!pendingReceives_.empty()) {
@@ -744,7 +744,7 @@ Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCrea
744744
}
745745
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
746746

747-
const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
747+
const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }
748748

749749
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
750750

lib/ProducerImpl.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
5858
partition_(partition),
5959
producerName_(conf_.getProducerName()),
6060
userProvidedProducerName_(false),
61-
producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
61+
producerStr_("[" + topic() + ", " + producerName_ + "] "),
6262
producerId_(client->newProducerId()),
6363
msgSequenceGenerator_(0),
6464
batchTimer_(executor_->createDeadlineTimer()),
@@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
6767
memoryLimitController_(client->getMemoryLimitController()),
6868
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
6969
interceptors_(interceptors) {
70-
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
70+
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
7171
<< " id: " << producerId_);
7272

7373
int64_t initialSequenceId = conf.getInitialSequenceId();
@@ -93,7 +93,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
9393

9494
if (conf_.isEncryptionEnabled()) {
9595
std::ostringstream logCtxStream;
96-
logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
96+
logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]";
9797
std::string logCtx = logCtxStream.str();
9898
msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
9999
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
@@ -123,7 +123,7 @@ ProducerImpl::~ProducerImpl() {
123123
}
124124
}
125125

126-
const std::string& ProducerImpl::getTopic() const { return *topic_; }
126+
const std::string& ProducerImpl::getTopic() const { return topic(); }
127127

128128
const std::string& ProducerImpl::getProducerName() const { return producerName_; }
129129

@@ -148,7 +148,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
148148
ClientImplPtr client = client_.lock();
149149
int requestId = client->newRequestId();
150150

151-
SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId,
151+
SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
152152
conf_.getProperties(), conf_.getSchema(), epoch_,
153153
userProvidedProducerName_, conf_.isEncryptionEnabled(),
154154
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
@@ -218,7 +218,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
218218
cnx->registerProducer(producerId_, shared_from_this());
219219
producerName_ = responseData.producerName;
220220
schemaVersion_ = responseData.schemaVersion;
221-
producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
221+
producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
222222
topicEpoch = responseData.topicEpoch;
223223

224224
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
@@ -788,7 +788,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {
788788

789789
return;
790790
}
791-
LOG_INFO(getName() << "Closing producer for topic " << topic_);
791+
LOG_INFO(getName() << "Closing producer for topic " << topic());
792792
state_ = Closing;
793793

794794
ClientConnectionPtr cnx = getCnx().lock();

0 commit comments

Comments
 (0)