@@ -51,8 +51,6 @@ using proto::BaseCommand;
5151
5252static const uint32_t DefaultBufferSize = 64 * 1024 ;
5353
54- static const int KeepAliveIntervalInSeconds = 30 ;
55-
5654static MessageId toMessageId (const proto::MessageIdData& messageIdData) {
5755 return MessageIdBuilder::from (messageIdData).build ();
5856}
@@ -186,6 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
186184 connectTimeoutTask_(
187185 std::make_shared<PeriodicTask>(*executor_, clientConfiguration.getConnectionTimeout())),
188186 outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
187+ keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()),
189188 consumerStatsRequestTimer_(executor_->createDeadlineTimer ()),
190189 maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
191190 clientVersion_(clientVersion),
@@ -310,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
310309 // Only send keep-alive probes if the broker supports it
311310 keepAliveTimer_ = executor_->createDeadlineTimer ();
312311 if (keepAliveTimer_) {
313- keepAliveTimer_->expires_from_now (std::chrono::seconds (KeepAliveIntervalInSeconds ));
312+ keepAliveTimer_->expires_from_now (std::chrono::seconds (keepAliveIntervalInSeconds_ ));
314313 auto weakSelf = weak_from_this ();
315314 keepAliveTimer_->async_wait ([weakSelf](const ASIO_ERROR&) {
316315 auto self = weakSelf.lock ();
@@ -1245,7 +1244,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12451244 // be zero And we do not attempt to dereference the pointer.
12461245 Lock lock (mutex_);
12471246 if (keepAliveTimer_) {
1248- keepAliveTimer_->expires_from_now (std::chrono::seconds (KeepAliveIntervalInSeconds ));
1247+ keepAliveTimer_->expires_from_now (std::chrono::seconds (keepAliveIntervalInSeconds_ ));
12491248 auto weakSelf = weak_from_this ();
12501249 keepAliveTimer_->async_wait ([weakSelf](const ASIO_ERROR&) {
12511250 auto self = weakSelf.lock ();
0 commit comments